xref: /qemu/job.c (revision c302660920acf48425a1317d56122ea8af60fbc4)
1  /*
2   * Background jobs (long-running operations)
3   *
4   * Copyright (c) 2011 IBM Corp.
5   * Copyright (c) 2012, 2018 Red Hat, Inc.
6   *
7   * Permission is hereby granted, free of charge, to any person obtaining a copy
8   * of this software and associated documentation files (the "Software"), to deal
9   * in the Software without restriction, including without limitation the rights
10   * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11   * copies of the Software, and to permit persons to whom the Software is
12   * furnished to do so, subject to the following conditions:
13   *
14   * The above copyright notice and this permission notice shall be included in
15   * all copies or substantial portions of the Software.
16   *
17   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23   * THE SOFTWARE.
24   */
25  
26  #include "qemu/osdep.h"
27  #include "qapi/error.h"
28  #include "qemu/job.h"
29  #include "qemu/id.h"
30  #include "qemu/main-loop.h"
31  #include "block/aio-wait.h"
32  #include "trace/trace-root.h"
33  #include "qapi/qapi-events-job.h"
34  
35  /*
36   * The job API is composed of two categories of functions.
37   *
38   * The first includes functions used by the monitor.  The monitor is
39   * peculiar in that it accesses the job list with job_get, and
40   * therefore needs consistency across job_get and the actual operation
41   * (e.g. job_user_cancel). To achieve this consistency, the caller
42   * calls job_lock/job_unlock itself around the whole operation.
43   *
44   *
45   * The second includes functions used by the job drivers and sometimes
46   * by the core block layer. These delegate the locking to the callee instead.
47   */
48  
49  /*
50   * job_mutex protects the jobs list, but also makes the
51   * struct job fields thread-safe.
52   */
53  QemuMutex job_mutex;
54  
55  /* Protected by job_mutex */
56  static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57  
58  /* Job State Transition Table */
59  bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60                                      /* U, C, R, P, Y, S, W, D, X, E, N */
61      /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62      /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63      /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64      /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65      /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66      /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67      /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68      /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69      /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70      /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71      /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72  };
73  
74  bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75                                      /* U, C, R, P, Y, S, W, D, X, E, N */
76      [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77      [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78      [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79      [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80      [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81      [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82      [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83      [JOB_VERB_CHANGE]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
84  };
85  
86  /* Transactional group of jobs */
87  struct JobTxn {
88  
89      /* Is this txn being cancelled? */
90      bool aborting;
91  
92      /* List of jobs */
93      QLIST_HEAD(, Job) jobs;
94  
95      /* Reference count */
96      int refcnt;
97  };
98  
job_lock(void)99  void job_lock(void)
100  {
101      qemu_mutex_lock(&job_mutex);
102  }
103  
job_unlock(void)104  void job_unlock(void)
105  {
106      qemu_mutex_unlock(&job_mutex);
107  }
108  
job_init(void)109  static void __attribute__((__constructor__)) job_init(void)
110  {
111      qemu_mutex_init(&job_mutex);
112  }
113  
job_txn_new(void)114  JobTxn *job_txn_new(void)
115  {
116      JobTxn *txn = g_new0(JobTxn, 1);
117      QLIST_INIT(&txn->jobs);
118      txn->refcnt = 1;
119      return txn;
120  }
121  
122  /* Called with job_mutex held. */
job_txn_ref_locked(JobTxn * txn)123  static void job_txn_ref_locked(JobTxn *txn)
124  {
125      txn->refcnt++;
126  }
127  
job_txn_unref_locked(JobTxn * txn)128  void job_txn_unref_locked(JobTxn *txn)
129  {
130      if (txn && --txn->refcnt == 0) {
131          g_free(txn);
132      }
133  }
134  
job_txn_unref(JobTxn * txn)135  void job_txn_unref(JobTxn *txn)
136  {
137      JOB_LOCK_GUARD();
138      job_txn_unref_locked(txn);
139  }
140  
141  /**
142   * @txn: The transaction (may be NULL)
143   * @job: Job to add to the transaction
144   *
145   * Add @job to the transaction.  The @job must not already be in a transaction.
146   * The caller must call either job_txn_unref() or job_completed() to release
147   * the reference that is automatically grabbed here.
148   *
149   * If @txn is NULL, the function does nothing.
150   *
151   * Called with job_mutex held.
152   */
job_txn_add_job_locked(JobTxn * txn,Job * job)153  static void job_txn_add_job_locked(JobTxn *txn, Job *job)
154  {
155      if (!txn) {
156          return;
157      }
158  
159      assert(!job->txn);
160      job->txn = txn;
161  
162      QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
163      job_txn_ref_locked(txn);
164  }
165  
166  /* Called with job_mutex held. */
job_txn_del_job_locked(Job * job)167  static void job_txn_del_job_locked(Job *job)
168  {
169      if (job->txn) {
170          QLIST_REMOVE(job, txn_list);
171          job_txn_unref_locked(job->txn);
172          job->txn = NULL;
173      }
174  }
175  
176  /* Called with job_mutex held, but releases it temporarily. */
job_txn_apply_locked(Job * job,int fn (Job *))177  static int job_txn_apply_locked(Job *job, int fn(Job *))
178  {
179      Job *other_job, *next;
180      JobTxn *txn = job->txn;
181      int rc = 0;
182  
183      /*
184       * Similar to job_completed_txn_abort, we take each job's lock before
185       * applying fn, but since we assume that outer_ctx is held by the caller,
186       * we need to release it here to avoid holding the lock twice - which would
187       * break AIO_WAIT_WHILE from within fn.
188       */
189      job_ref_locked(job);
190  
191      QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
192          rc = fn(other_job);
193          if (rc) {
194              break;
195          }
196      }
197  
198      job_unref_locked(job);
199      return rc;
200  }
201  
job_is_internal(Job * job)202  bool job_is_internal(Job *job)
203  {
204      return (job->id == NULL);
205  }
206  
207  /* Called with job_mutex held. */
job_state_transition_locked(Job * job,JobStatus s1)208  static void job_state_transition_locked(Job *job, JobStatus s1)
209  {
210      JobStatus s0 = job->status;
211      assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
212      trace_job_state_transition(job, job->ret,
213                                 JobSTT[s0][s1] ? "allowed" : "disallowed",
214                                 JobStatus_str(s0), JobStatus_str(s1));
215      assert(JobSTT[s0][s1]);
216      job->status = s1;
217  
218      if (!job_is_internal(job) && s1 != s0) {
219          qapi_event_send_job_status_change(job->id, job->status);
220      }
221  }
222  
job_apply_verb_locked(Job * job,JobVerb verb,Error ** errp)223  int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
224  {
225      JobStatus s0 = job->status;
226      assert(verb >= 0 && verb < JOB_VERB__MAX);
227      trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
228                           JobVerbTable[verb][s0] ? "allowed" : "prohibited");
229      if (JobVerbTable[verb][s0]) {
230          return 0;
231      }
232      error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
233                 job->id, JobStatus_str(s0), JobVerb_str(verb));
234      return -EPERM;
235  }
236  
job_type(const Job * job)237  JobType job_type(const Job *job)
238  {
239      return job->driver->job_type;
240  }
241  
job_type_str(const Job * job)242  const char *job_type_str(const Job *job)
243  {
244      return JobType_str(job_type(job));
245  }
246  
job_is_cancelled_locked(Job * job)247  bool job_is_cancelled_locked(Job *job)
248  {
249      /* force_cancel may be true only if cancelled is true, too */
250      assert(job->cancelled || !job->force_cancel);
251      return job->force_cancel;
252  }
253  
job_is_paused(Job * job)254  bool job_is_paused(Job *job)
255  {
256      JOB_LOCK_GUARD();
257      return job->paused;
258  }
259  
job_is_cancelled(Job * job)260  bool job_is_cancelled(Job *job)
261  {
262      JOB_LOCK_GUARD();
263      return job_is_cancelled_locked(job);
264  }
265  
266  /* Called with job_mutex held. */
job_cancel_requested_locked(Job * job)267  static bool job_cancel_requested_locked(Job *job)
268  {
269      return job->cancelled;
270  }
271  
job_cancel_requested(Job * job)272  bool job_cancel_requested(Job *job)
273  {
274      JOB_LOCK_GUARD();
275      return job_cancel_requested_locked(job);
276  }
277  
job_is_ready_locked(Job * job)278  bool job_is_ready_locked(Job *job)
279  {
280      switch (job->status) {
281      case JOB_STATUS_UNDEFINED:
282      case JOB_STATUS_CREATED:
283      case JOB_STATUS_RUNNING:
284      case JOB_STATUS_PAUSED:
285      case JOB_STATUS_WAITING:
286      case JOB_STATUS_PENDING:
287      case JOB_STATUS_ABORTING:
288      case JOB_STATUS_CONCLUDED:
289      case JOB_STATUS_NULL:
290          return false;
291      case JOB_STATUS_READY:
292      case JOB_STATUS_STANDBY:
293          return true;
294      default:
295          g_assert_not_reached();
296      }
297      return false;
298  }
299  
job_is_ready(Job * job)300  bool job_is_ready(Job *job)
301  {
302      JOB_LOCK_GUARD();
303      return job_is_ready_locked(job);
304  }
305  
job_is_completed_locked(Job * job)306  bool job_is_completed_locked(Job *job)
307  {
308      switch (job->status) {
309      case JOB_STATUS_UNDEFINED:
310      case JOB_STATUS_CREATED:
311      case JOB_STATUS_RUNNING:
312      case JOB_STATUS_PAUSED:
313      case JOB_STATUS_READY:
314      case JOB_STATUS_STANDBY:
315          return false;
316      case JOB_STATUS_WAITING:
317      case JOB_STATUS_PENDING:
318      case JOB_STATUS_ABORTING:
319      case JOB_STATUS_CONCLUDED:
320      case JOB_STATUS_NULL:
321          return true;
322      default:
323          g_assert_not_reached();
324      }
325      return false;
326  }
327  
job_is_completed(Job * job)328  static bool job_is_completed(Job *job)
329  {
330      JOB_LOCK_GUARD();
331      return job_is_completed_locked(job);
332  }
333  
job_started_locked(Job * job)334  static bool job_started_locked(Job *job)
335  {
336      return job->co;
337  }
338  
339  /* Called with job_mutex held. */
job_should_pause_locked(Job * job)340  static bool job_should_pause_locked(Job *job)
341  {
342      return job->pause_count > 0;
343  }
344  
job_next_locked(Job * job)345  Job *job_next_locked(Job *job)
346  {
347      if (!job) {
348          return QLIST_FIRST(&jobs);
349      }
350      return QLIST_NEXT(job, job_list);
351  }
352  
job_next(Job * job)353  Job *job_next(Job *job)
354  {
355      JOB_LOCK_GUARD();
356      return job_next_locked(job);
357  }
358  
job_get_locked(const char * id)359  Job *job_get_locked(const char *id)
360  {
361      Job *job;
362  
363      QLIST_FOREACH(job, &jobs, job_list) {
364          if (job->id && !strcmp(id, job->id)) {
365              return job;
366          }
367      }
368  
369      return NULL;
370  }
371  
job_set_aio_context(Job * job,AioContext * ctx)372  void job_set_aio_context(Job *job, AioContext *ctx)
373  {
374      /* protect against read in job_finish_sync_locked and job_start */
375      GLOBAL_STATE_CODE();
376      /* protect against read in job_do_yield_locked */
377      JOB_LOCK_GUARD();
378      /* ensure the job is quiescent while the AioContext is changed */
379      assert(job->paused || job_is_completed_locked(job));
380      job->aio_context = ctx;
381  }
382  
383  /* Called with job_mutex *not* held. */
job_sleep_timer_cb(void * opaque)384  static void job_sleep_timer_cb(void *opaque)
385  {
386      Job *job = opaque;
387  
388      job_enter(job);
389  }
390  
job_create(const char * job_id,const JobDriver * driver,JobTxn * txn,AioContext * ctx,int flags,BlockCompletionFunc * cb,void * opaque,Error ** errp)391  void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
392                   AioContext *ctx, int flags, BlockCompletionFunc *cb,
393                   void *opaque, Error **errp)
394  {
395      Job *job;
396  
397      JOB_LOCK_GUARD();
398  
399      if (job_id) {
400          if (flags & JOB_INTERNAL) {
401              error_setg(errp, "Cannot specify job ID for internal job");
402              return NULL;
403          }
404          if (!id_wellformed(job_id)) {
405              error_setg(errp, "Invalid job ID '%s'", job_id);
406              return NULL;
407          }
408          if (job_get_locked(job_id)) {
409              error_setg(errp, "Job ID '%s' already in use", job_id);
410              return NULL;
411          }
412      } else if (!(flags & JOB_INTERNAL)) {
413          error_setg(errp, "An explicit job ID is required");
414          return NULL;
415      }
416  
417      job = g_malloc0(driver->instance_size);
418      job->driver        = driver;
419      job->id            = g_strdup(job_id);
420      job->refcnt        = 1;
421      job->aio_context   = ctx;
422      job->busy          = false;
423      job->paused        = true;
424      job->pause_count   = 1;
425      job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
426      job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
427      job->cb            = cb;
428      job->opaque        = opaque;
429  
430      progress_init(&job->progress);
431  
432      notifier_list_init(&job->on_finalize_cancelled);
433      notifier_list_init(&job->on_finalize_completed);
434      notifier_list_init(&job->on_pending);
435      notifier_list_init(&job->on_ready);
436      notifier_list_init(&job->on_idle);
437  
438      job_state_transition_locked(job, JOB_STATUS_CREATED);
439      aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
440                     QEMU_CLOCK_REALTIME, SCALE_NS,
441                     job_sleep_timer_cb, job);
442  
443      QLIST_INSERT_HEAD(&jobs, job, job_list);
444  
445      /* Single jobs are modeled as single-job transactions for sake of
446       * consolidating the job management logic */
447      if (!txn) {
448          txn = job_txn_new();
449          job_txn_add_job_locked(txn, job);
450          job_txn_unref_locked(txn);
451      } else {
452          job_txn_add_job_locked(txn, job);
453      }
454  
455      return job;
456  }
457  
job_ref_locked(Job * job)458  void job_ref_locked(Job *job)
459  {
460      ++job->refcnt;
461  }
462  
job_unref_locked(Job * job)463  void job_unref_locked(Job *job)
464  {
465      GLOBAL_STATE_CODE();
466  
467      if (--job->refcnt == 0) {
468          assert(job->status == JOB_STATUS_NULL);
469          assert(!timer_pending(&job->sleep_timer));
470          assert(!job->txn);
471  
472          if (job->driver->free) {
473              job_unlock();
474              job->driver->free(job);
475              job_lock();
476          }
477  
478          QLIST_REMOVE(job, job_list);
479  
480          progress_destroy(&job->progress);
481          error_free(job->err);
482          g_free(job->id);
483          g_free(job);
484      }
485  }
486  
job_progress_update(Job * job,uint64_t done)487  void job_progress_update(Job *job, uint64_t done)
488  {
489      progress_work_done(&job->progress, done);
490  }
491  
job_progress_set_remaining(Job * job,uint64_t remaining)492  void job_progress_set_remaining(Job *job, uint64_t remaining)
493  {
494      progress_set_remaining(&job->progress, remaining);
495  }
496  
job_progress_increase_remaining(Job * job,uint64_t delta)497  void job_progress_increase_remaining(Job *job, uint64_t delta)
498  {
499      progress_increase_remaining(&job->progress, delta);
500  }
501  
502  /**
503   * To be called when a cancelled job is finalised.
504   * Called with job_mutex held.
505   */
job_event_cancelled_locked(Job * job)506  static void job_event_cancelled_locked(Job *job)
507  {
508      notifier_list_notify(&job->on_finalize_cancelled, job);
509  }
510  
511  /**
512   * To be called when a successfully completed job is finalised.
513   * Called with job_mutex held.
514   */
job_event_completed_locked(Job * job)515  static void job_event_completed_locked(Job *job)
516  {
517      notifier_list_notify(&job->on_finalize_completed, job);
518  }
519  
520  /* Called with job_mutex held. */
job_event_pending_locked(Job * job)521  static void job_event_pending_locked(Job *job)
522  {
523      notifier_list_notify(&job->on_pending, job);
524  }
525  
526  /* Called with job_mutex held. */
job_event_ready_locked(Job * job)527  static void job_event_ready_locked(Job *job)
528  {
529      notifier_list_notify(&job->on_ready, job);
530  }
531  
532  /* Called with job_mutex held. */
job_event_idle_locked(Job * job)533  static void job_event_idle_locked(Job *job)
534  {
535      notifier_list_notify(&job->on_idle, job);
536  }
537  
job_enter_cond_locked(Job * job,bool (* fn)(Job * job))538  void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
539  {
540      if (!job_started_locked(job)) {
541          return;
542      }
543      if (job->deferred_to_main_loop) {
544          return;
545      }
546  
547      if (job->busy) {
548          return;
549      }
550  
551      if (fn && !fn(job)) {
552          return;
553      }
554  
555      assert(!job->deferred_to_main_loop);
556      timer_del(&job->sleep_timer);
557      job->busy = true;
558      job_unlock();
559      aio_co_wake(job->co);
560      job_lock();
561  }
562  
job_enter(Job * job)563  void job_enter(Job *job)
564  {
565      JOB_LOCK_GUARD();
566      job_enter_cond_locked(job, NULL);
567  }
568  
569  /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
570   * Reentering the job coroutine with job_enter() before the timer has expired
571   * is allowed and cancels the timer.
572   *
573   * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
574   * called explicitly.
575   *
576   * Called with job_mutex held, but releases it temporarily.
577   */
job_do_yield_locked(Job * job,uint64_t ns)578  static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
579  {
580      AioContext *next_aio_context;
581  
582      if (ns != -1) {
583          timer_mod(&job->sleep_timer, ns);
584      }
585      job->busy = false;
586      job_event_idle_locked(job);
587      job_unlock();
588      qemu_coroutine_yield();
589      job_lock();
590  
591      next_aio_context = job->aio_context;
592      /*
593       * Coroutine has resumed, but in the meanwhile the job AioContext
594       * might have changed via bdrv_try_change_aio_context(), so we need to move
595       * the coroutine too in the new aiocontext.
596       */
597      while (qemu_get_current_aio_context() != next_aio_context) {
598          job_unlock();
599          aio_co_reschedule_self(next_aio_context);
600          job_lock();
601          next_aio_context = job->aio_context;
602      }
603  
604      /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
605      assert(job->busy);
606  }
607  
608  /* Called with job_mutex held, but releases it temporarily. */
job_pause_point_locked(Job * job)609  static void coroutine_fn job_pause_point_locked(Job *job)
610  {
611      assert(job && job_started_locked(job));
612  
613      if (!job_should_pause_locked(job)) {
614          return;
615      }
616      if (job_is_cancelled_locked(job)) {
617          return;
618      }
619  
620      if (job->driver->pause) {
621          job_unlock();
622          job->driver->pause(job);
623          job_lock();
624      }
625  
626      if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
627          JobStatus status = job->status;
628          job_state_transition_locked(job, status == JOB_STATUS_READY
629                                      ? JOB_STATUS_STANDBY
630                                      : JOB_STATUS_PAUSED);
631          job->paused = true;
632          job_do_yield_locked(job, -1);
633          job->paused = false;
634          job_state_transition_locked(job, status);
635      }
636  
637      if (job->driver->resume) {
638          job_unlock();
639          job->driver->resume(job);
640          job_lock();
641      }
642  }
643  
job_pause_point(Job * job)644  void coroutine_fn job_pause_point(Job *job)
645  {
646      JOB_LOCK_GUARD();
647      job_pause_point_locked(job);
648  }
649  
job_yield(Job * job)650  void coroutine_fn job_yield(Job *job)
651  {
652      JOB_LOCK_GUARD();
653      assert(job->busy);
654  
655      /* Check cancellation *before* setting busy = false, too!  */
656      if (job_is_cancelled_locked(job)) {
657          return;
658      }
659  
660      if (!job_should_pause_locked(job)) {
661          job_do_yield_locked(job, -1);
662      }
663  
664      job_pause_point_locked(job);
665  }
666  
job_sleep_ns(Job * job,int64_t ns)667  void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
668  {
669      JOB_LOCK_GUARD();
670      assert(job->busy);
671  
672      /* Check cancellation *before* setting busy = false, too!  */
673      if (job_is_cancelled_locked(job)) {
674          return;
675      }
676  
677      if (!job_should_pause_locked(job)) {
678          job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
679      }
680  
681      job_pause_point_locked(job);
682  }
683  
684  /* Assumes the job_mutex is held */
job_timer_not_pending_locked(Job * job)685  static bool job_timer_not_pending_locked(Job *job)
686  {
687      return !timer_pending(&job->sleep_timer);
688  }
689  
job_pause_locked(Job * job)690  void job_pause_locked(Job *job)
691  {
692      job->pause_count++;
693      if (!job->paused) {
694          job_enter_cond_locked(job, NULL);
695      }
696  }
697  
job_pause(Job * job)698  void job_pause(Job *job)
699  {
700      JOB_LOCK_GUARD();
701      job_pause_locked(job);
702  }
703  
job_resume_locked(Job * job)704  void job_resume_locked(Job *job)
705  {
706      assert(job->pause_count > 0);
707      job->pause_count--;
708      if (job->pause_count) {
709          return;
710      }
711  
712      /* kick only if no timer is pending */
713      job_enter_cond_locked(job, job_timer_not_pending_locked);
714  }
715  
job_resume(Job * job)716  void job_resume(Job *job)
717  {
718      JOB_LOCK_GUARD();
719      job_resume_locked(job);
720  }
721  
job_user_pause_locked(Job * job,Error ** errp)722  void job_user_pause_locked(Job *job, Error **errp)
723  {
724      if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
725          return;
726      }
727      if (job->user_paused) {
728          error_setg(errp, "Job is already paused");
729          return;
730      }
731      job->user_paused = true;
732      job_pause_locked(job);
733  }
734  
job_user_paused_locked(Job * job)735  bool job_user_paused_locked(Job *job)
736  {
737      return job->user_paused;
738  }
739  
job_user_resume_locked(Job * job,Error ** errp)740  void job_user_resume_locked(Job *job, Error **errp)
741  {
742      assert(job);
743      GLOBAL_STATE_CODE();
744      if (!job->user_paused || job->pause_count <= 0) {
745          error_setg(errp, "Can't resume a job that was not paused");
746          return;
747      }
748      if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
749          return;
750      }
751      if (job->driver->user_resume) {
752          job_unlock();
753          job->driver->user_resume(job);
754          job_lock();
755      }
756      job->user_paused = false;
757      job_resume_locked(job);
758  }
759  
760  /* Called with job_mutex held, but releases it temporarily. */
job_do_dismiss_locked(Job * job)761  static void job_do_dismiss_locked(Job *job)
762  {
763      assert(job);
764      job->busy = false;
765      job->paused = false;
766      job->deferred_to_main_loop = true;
767  
768      job_txn_del_job_locked(job);
769  
770      job_state_transition_locked(job, JOB_STATUS_NULL);
771      job_unref_locked(job);
772  }
773  
job_dismiss_locked(Job ** jobptr,Error ** errp)774  void job_dismiss_locked(Job **jobptr, Error **errp)
775  {
776      Job *job = *jobptr;
777      /* similarly to _complete, this is QMP-interface only. */
778      assert(job->id);
779      if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
780          return;
781      }
782  
783      job_do_dismiss_locked(job);
784      *jobptr = NULL;
785  }
786  
job_early_fail(Job * job)787  void job_early_fail(Job *job)
788  {
789      JOB_LOCK_GUARD();
790      assert(job->status == JOB_STATUS_CREATED);
791      job_do_dismiss_locked(job);
792  }
793  
794  /* Called with job_mutex held. */
job_conclude_locked(Job * job)795  static void job_conclude_locked(Job *job)
796  {
797      job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
798      if (job->auto_dismiss || !job_started_locked(job)) {
799          job_do_dismiss_locked(job);
800      }
801  }
802  
803  /* Called with job_mutex held. */
job_update_rc_locked(Job * job)804  static void job_update_rc_locked(Job *job)
805  {
806      if (!job->ret && job_is_cancelled_locked(job)) {
807          job->ret = -ECANCELED;
808      }
809      if (job->ret) {
810          if (!job->err) {
811              error_setg(&job->err, "%s", strerror(-job->ret));
812          }
813          job_state_transition_locked(job, JOB_STATUS_ABORTING);
814      }
815  }
816  
job_commit(Job * job)817  static void job_commit(Job *job)
818  {
819      assert(!job->ret);
820      GLOBAL_STATE_CODE();
821      if (job->driver->commit) {
822          job->driver->commit(job);
823      }
824  }
825  
job_abort(Job * job)826  static void job_abort(Job *job)
827  {
828      assert(job->ret);
829      GLOBAL_STATE_CODE();
830      if (job->driver->abort) {
831          job->driver->abort(job);
832      }
833  }
834  
job_clean(Job * job)835  static void job_clean(Job *job)
836  {
837      GLOBAL_STATE_CODE();
838      if (job->driver->clean) {
839          job->driver->clean(job);
840      }
841  }
842  
843  /*
844   * Called with job_mutex held, but releases it temporarily.
845   */
job_finalize_single_locked(Job * job)846  static int job_finalize_single_locked(Job *job)
847  {
848      int job_ret;
849  
850      assert(job_is_completed_locked(job));
851  
852      /* Ensure abort is called for late-transactional failures */
853      job_update_rc_locked(job);
854  
855      job_ret = job->ret;
856      job_unlock();
857  
858      if (!job_ret) {
859          job_commit(job);
860      } else {
861          job_abort(job);
862      }
863      job_clean(job);
864  
865      if (job->cb) {
866          job->cb(job->opaque, job_ret);
867      }
868  
869      job_lock();
870  
871      /* Emit events only if we actually started */
872      if (job_started_locked(job)) {
873          if (job_is_cancelled_locked(job)) {
874              job_event_cancelled_locked(job);
875          } else {
876              job_event_completed_locked(job);
877          }
878      }
879  
880      job_txn_del_job_locked(job);
881      job_conclude_locked(job);
882      return 0;
883  }
884  
885  /*
886   * Called with job_mutex held, but releases it temporarily.
887   */
job_cancel_async_locked(Job * job,bool force)888  static void job_cancel_async_locked(Job *job, bool force)
889  {
890      GLOBAL_STATE_CODE();
891      if (job->driver->cancel) {
892          job_unlock();
893          force = job->driver->cancel(job, force);
894          job_lock();
895      } else {
896          /* No .cancel() means the job will behave as if force-cancelled */
897          force = true;
898      }
899  
900      if (job->user_paused) {
901          /* Do not call job_enter here, the caller will handle it.  */
902          if (job->driver->user_resume) {
903              job_unlock();
904              job->driver->user_resume(job);
905              job_lock();
906          }
907          job->user_paused = false;
908          assert(job->pause_count > 0);
909          job->pause_count--;
910      }
911  
912      /*
913       * Ignore soft cancel requests after the job is already done
914       * (We will still invoke job->driver->cancel() above, but if the
915       * job driver supports soft cancelling and the job is done, that
916       * should be a no-op, too.  We still call it so it can override
917       * @force.)
918       */
919      if (force || !job->deferred_to_main_loop) {
920          job->cancelled = true;
921          /* To prevent 'force == false' overriding a previous 'force == true' */
922          job->force_cancel |= force;
923      }
924  }
925  
926  /*
927   * Called with job_mutex held, but releases it temporarily.
928   */
job_completed_txn_abort_locked(Job * job)929  static void job_completed_txn_abort_locked(Job *job)
930  {
931      JobTxn *txn = job->txn;
932      Job *other_job;
933  
934      if (txn->aborting) {
935          /*
936           * We are cancelled by another job, which will handle everything.
937           */
938          return;
939      }
940      txn->aborting = true;
941      job_txn_ref_locked(txn);
942  
943      job_ref_locked(job);
944  
945      /* Other jobs are effectively cancelled by us, set the status for
946       * them; this job, however, may or may not be cancelled, depending
947       * on the caller, so leave it. */
948      QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
949          if (other_job != job) {
950              /*
951               * This is a transaction: If one job failed, no result will matter.
952               * Therefore, pass force=true to terminate all other jobs as quickly
953               * as possible.
954               */
955              job_cancel_async_locked(other_job, true);
956          }
957      }
958      while (!QLIST_EMPTY(&txn->jobs)) {
959          other_job = QLIST_FIRST(&txn->jobs);
960          if (!job_is_completed_locked(other_job)) {
961              assert(job_cancel_requested_locked(other_job));
962              job_finish_sync_locked(other_job, NULL, NULL);
963          }
964          job_finalize_single_locked(other_job);
965      }
966  
967      job_unref_locked(job);
968      job_txn_unref_locked(txn);
969  }
970  
971  /* Called with job_mutex held, but releases it temporarily */
job_prepare_locked(Job * job)972  static int job_prepare_locked(Job *job)
973  {
974      int ret;
975  
976      GLOBAL_STATE_CODE();
977  
978      if (job->ret == 0 && job->driver->prepare) {
979          job_unlock();
980          ret = job->driver->prepare(job);
981          job_lock();
982          job->ret = ret;
983          job_update_rc_locked(job);
984      }
985  
986      return job->ret;
987  }
988  
989  /* Called with job_mutex held */
job_needs_finalize_locked(Job * job)990  static int job_needs_finalize_locked(Job *job)
991  {
992      return !job->auto_finalize;
993  }
994  
995  /* Called with job_mutex held */
job_do_finalize_locked(Job * job)996  static void job_do_finalize_locked(Job *job)
997  {
998      int rc;
999      assert(job && job->txn);
1000  
1001      /* prepare the transaction to complete */
1002      rc = job_txn_apply_locked(job, job_prepare_locked);
1003      if (rc) {
1004          job_completed_txn_abort_locked(job);
1005      } else {
1006          job_txn_apply_locked(job, job_finalize_single_locked);
1007      }
1008  }
1009  
job_finalize_locked(Job * job,Error ** errp)1010  void job_finalize_locked(Job *job, Error **errp)
1011  {
1012      assert(job && job->id);
1013      if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1014          return;
1015      }
1016      job_do_finalize_locked(job);
1017  }
1018  
1019  /* Called with job_mutex held. */
job_transition_to_pending_locked(Job * job)1020  static int job_transition_to_pending_locked(Job *job)
1021  {
1022      job_state_transition_locked(job, JOB_STATUS_PENDING);
1023      if (!job->auto_finalize) {
1024          job_event_pending_locked(job);
1025      }
1026      return 0;
1027  }
1028  
job_transition_to_ready(Job * job)1029  void job_transition_to_ready(Job *job)
1030  {
1031      JOB_LOCK_GUARD();
1032      job_state_transition_locked(job, JOB_STATUS_READY);
1033      job_event_ready_locked(job);
1034  }
1035  
1036  /* Called with job_mutex held. */
job_completed_txn_success_locked(Job * job)1037  static void job_completed_txn_success_locked(Job *job)
1038  {
1039      JobTxn *txn = job->txn;
1040      Job *other_job;
1041  
1042      job_state_transition_locked(job, JOB_STATUS_WAITING);
1043  
1044      /*
1045       * Successful completion, see if there are other running jobs in this
1046       * txn.
1047       */
1048      QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1049          if (!job_is_completed_locked(other_job)) {
1050              return;
1051          }
1052          assert(other_job->ret == 0);
1053      }
1054  
1055      job_txn_apply_locked(job, job_transition_to_pending_locked);
1056  
1057      /* If no jobs need manual finalization, automatically do so */
1058      if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1059          job_do_finalize_locked(job);
1060      }
1061  }
1062  
1063  /* Called with job_mutex held. */
job_completed_locked(Job * job)1064  static void job_completed_locked(Job *job)
1065  {
1066      assert(job && job->txn && !job_is_completed_locked(job));
1067  
1068      job_update_rc_locked(job);
1069      trace_job_completed(job, job->ret);
1070      if (job->ret) {
1071          job_completed_txn_abort_locked(job);
1072      } else {
1073          job_completed_txn_success_locked(job);
1074      }
1075  }
1076  
1077  /**
1078   * Useful only as a type shim for aio_bh_schedule_oneshot.
1079   * Called with job_mutex *not* held.
1080   */
job_exit(void * opaque)1081  static void job_exit(void *opaque)
1082  {
1083      Job *job = (Job *)opaque;
1084      JOB_LOCK_GUARD();
1085      job_ref_locked(job);
1086  
1087      /* This is a lie, we're not quiescent, but still doing the completion
1088       * callbacks. However, completion callbacks tend to involve operations that
1089       * drain block nodes, and if .drained_poll still returned true, we would
1090       * deadlock. */
1091      job->busy = false;
1092      job_event_idle_locked(job);
1093  
1094      job_completed_locked(job);
1095      job_unref_locked(job);
1096  }
1097  
1098  /**
1099   * All jobs must allow a pause point before entering their job proper. This
1100   * ensures that jobs can be paused prior to being started, then resumed later.
1101   */
job_co_entry(void * opaque)1102  static void coroutine_fn job_co_entry(void *opaque)
1103  {
1104      Job *job = opaque;
1105      int ret;
1106  
1107      assert(job && job->driver && job->driver->run);
1108      WITH_JOB_LOCK_GUARD() {
1109          assert(job->aio_context == qemu_get_current_aio_context());
1110          job_pause_point_locked(job);
1111      }
1112      ret = job->driver->run(job, &job->err);
1113      WITH_JOB_LOCK_GUARD() {
1114          job->ret = ret;
1115          job->deferred_to_main_loop = true;
1116          job->busy = true;
1117      }
1118      aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1119  }
1120  
job_start(Job * job)1121  void job_start(Job *job)
1122  {
1123      assert(qemu_in_main_thread());
1124  
1125      WITH_JOB_LOCK_GUARD() {
1126          assert(job && !job_started_locked(job) && job->paused &&
1127              job->driver && job->driver->run);
1128          job->co = qemu_coroutine_create(job_co_entry, job);
1129          job->pause_count--;
1130          job->busy = true;
1131          job->paused = false;
1132          job_state_transition_locked(job, JOB_STATUS_RUNNING);
1133      }
1134      aio_co_enter(job->aio_context, job->co);
1135  }
1136  
job_cancel_locked(Job * job,bool force)1137  void job_cancel_locked(Job *job, bool force)
1138  {
1139      if (job->status == JOB_STATUS_CONCLUDED) {
1140          job_do_dismiss_locked(job);
1141          return;
1142      }
1143      job_cancel_async_locked(job, force);
1144      if (!job_started_locked(job)) {
1145          job_completed_locked(job);
1146      } else if (job->deferred_to_main_loop) {
1147          /*
1148           * job_cancel_async() ignores soft-cancel requests for jobs
1149           * that are already done (i.e. deferred to the main loop).  We
1150           * have to check again whether the job is really cancelled.
1151           * (job_cancel_requested() and job_is_cancelled() are equivalent
1152           * here, because job_cancel_async() will make soft-cancel
1153           * requests no-ops when deferred_to_main_loop is true.  We
1154           * choose to call job_is_cancelled() to show that we invoke
1155           * job_completed_txn_abort() only for force-cancelled jobs.)
1156           */
1157          if (job_is_cancelled_locked(job)) {
1158              job_completed_txn_abort_locked(job);
1159          }
1160      } else {
1161          job_enter_cond_locked(job, NULL);
1162      }
1163  }
1164  
job_user_cancel_locked(Job * job,bool force,Error ** errp)1165  void job_user_cancel_locked(Job *job, bool force, Error **errp)
1166  {
1167      if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1168          return;
1169      }
1170      job_cancel_locked(job, force);
1171  }
1172  
1173  /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1174   * be used with job_finish_sync_locked() without the need for (rather nasty)
1175   * function pointer casts there.
1176   *
1177   * Called with job_mutex held.
1178   */
job_cancel_err_locked(Job * job,Error ** errp)1179  static void job_cancel_err_locked(Job *job, Error **errp)
1180  {
1181      job_cancel_locked(job, false);
1182  }
1183  
1184  /**
1185   * Same as job_cancel_err(), but force-cancel.
1186   * Called with job_mutex held.
1187   */
job_force_cancel_err_locked(Job * job,Error ** errp)1188  static void job_force_cancel_err_locked(Job *job, Error **errp)
1189  {
1190      job_cancel_locked(job, true);
1191  }
1192  
job_cancel_sync_locked(Job * job,bool force)1193  int job_cancel_sync_locked(Job *job, bool force)
1194  {
1195      if (force) {
1196          return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1197      } else {
1198          return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1199      }
1200  }
1201  
job_cancel_sync(Job * job,bool force)1202  int job_cancel_sync(Job *job, bool force)
1203  {
1204      JOB_LOCK_GUARD();
1205      return job_cancel_sync_locked(job, force);
1206  }
1207  
job_cancel_sync_all(void)1208  void job_cancel_sync_all(void)
1209  {
1210      Job *job;
1211      JOB_LOCK_GUARD();
1212  
1213      while ((job = job_next_locked(NULL))) {
1214          job_cancel_sync_locked(job, true);
1215      }
1216  }
1217  
job_complete_sync_locked(Job * job,Error ** errp)1218  int job_complete_sync_locked(Job *job, Error **errp)
1219  {
1220      return job_finish_sync_locked(job, job_complete_locked, errp);
1221  }
1222  
job_complete_locked(Job * job,Error ** errp)1223  void job_complete_locked(Job *job, Error **errp)
1224  {
1225      /* Should not be reachable via external interface for internal jobs */
1226      assert(job->id);
1227      GLOBAL_STATE_CODE();
1228      if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1229          return;
1230      }
1231      if (job_cancel_requested_locked(job) || !job->driver->complete) {
1232          error_setg(errp, "The active block job '%s' cannot be completed",
1233                     job->id);
1234          return;
1235      }
1236  
1237      job_unlock();
1238      job->driver->complete(job, errp);
1239      job_lock();
1240  }
1241  
job_finish_sync_locked(Job * job,void (* finish)(Job *,Error ** errp),Error ** errp)1242  int job_finish_sync_locked(Job *job,
1243                             void (*finish)(Job *, Error **errp),
1244                             Error **errp)
1245  {
1246      Error *local_err = NULL;
1247      int ret;
1248      GLOBAL_STATE_CODE();
1249  
1250      job_ref_locked(job);
1251  
1252      if (finish) {
1253          finish(job, &local_err);
1254      }
1255      if (local_err) {
1256          error_propagate(errp, local_err);
1257          job_unref_locked(job);
1258          return -EBUSY;
1259      }
1260  
1261      job_unlock();
1262      AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1263                              (job_enter(job), !job_is_completed(job)));
1264      job_lock();
1265  
1266      ret = (job_is_cancelled_locked(job) && job->ret == 0)
1267            ? -ECANCELED : job->ret;
1268      job_unref_locked(job);
1269      return ret;
1270  }
1271