xref: /qemu/iothread.c (revision 30456d5ba3736b8f0c73560e559cdd2f76a4885e)
1  /*
2   * Event loop thread
3   *
4   * Copyright Red Hat Inc., 2013
5   *
6   * Authors:
7   *  Stefan Hajnoczi   <stefanha@redhat.com>
8   *
9   * This work is licensed under the terms of the GNU GPL, version 2 or later.
10   * See the COPYING file in the top-level directory.
11   *
12   */
13  
14  #include "qemu/osdep.h"
15  #include "qom/object.h"
16  #include "qom/object_interfaces.h"
17  #include "qemu/module.h"
18  #include "block/aio.h"
19  #include "sysemu/iothread.h"
20  #include "qmp-commands.h"
21  #include "qemu/error-report.h"
22  #include "qemu/rcu.h"
23  
24  typedef ObjectClass IOThreadClass;
25  
26  #define IOTHREAD_GET_CLASS(obj) \
27     OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
28  #define IOTHREAD_CLASS(klass) \
29     OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
30  
31  static void *iothread_run(void *opaque)
32  {
33      IOThread *iothread = opaque;
34      bool blocking;
35  
36      rcu_register_thread();
37  
38      qemu_mutex_lock(&iothread->init_done_lock);
39      iothread->thread_id = qemu_get_thread_id();
40      qemu_cond_signal(&iothread->init_done_cond);
41      qemu_mutex_unlock(&iothread->init_done_lock);
42  
43      while (!iothread->stopping) {
44          aio_context_acquire(iothread->ctx);
45          blocking = true;
46          while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
47              /* Progress was made, keep going */
48              blocking = false;
49          }
50          aio_context_release(iothread->ctx);
51      }
52  
53      rcu_unregister_thread();
54      return NULL;
55  }
56  
57  static void iothread_instance_finalize(Object *obj)
58  {
59      IOThread *iothread = IOTHREAD(obj);
60  
61      if (!iothread->ctx) {
62          return;
63      }
64      iothread->stopping = true;
65      aio_notify(iothread->ctx);
66      qemu_thread_join(&iothread->thread);
67      qemu_cond_destroy(&iothread->init_done_cond);
68      qemu_mutex_destroy(&iothread->init_done_lock);
69      aio_context_unref(iothread->ctx);
70  }
71  
72  static void iothread_complete(UserCreatable *obj, Error **errp)
73  {
74      Error *local_error = NULL;
75      IOThread *iothread = IOTHREAD(obj);
76      char *name, *thread_name;
77  
78      iothread->stopping = false;
79      iothread->thread_id = -1;
80      iothread->ctx = aio_context_new(&local_error);
81      if (!iothread->ctx) {
82          error_propagate(errp, local_error);
83          return;
84      }
85  
86      qemu_mutex_init(&iothread->init_done_lock);
87      qemu_cond_init(&iothread->init_done_cond);
88  
89      /* This assumes we are called from a thread with useful CPU affinity for us
90       * to inherit.
91       */
92      name = object_get_canonical_path_component(OBJECT(obj));
93      thread_name = g_strdup_printf("IO %s", name);
94      qemu_thread_create(&iothread->thread, thread_name, iothread_run,
95                         iothread, QEMU_THREAD_JOINABLE);
96      g_free(thread_name);
97      g_free(name);
98  
99      /* Wait for initialization to complete */
100      qemu_mutex_lock(&iothread->init_done_lock);
101      while (iothread->thread_id == -1) {
102          qemu_cond_wait(&iothread->init_done_cond,
103                         &iothread->init_done_lock);
104      }
105      qemu_mutex_unlock(&iothread->init_done_lock);
106  }
107  
108  static void iothread_class_init(ObjectClass *klass, void *class_data)
109  {
110      UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
111      ucc->complete = iothread_complete;
112  }
113  
114  static const TypeInfo iothread_info = {
115      .name = TYPE_IOTHREAD,
116      .parent = TYPE_OBJECT,
117      .class_init = iothread_class_init,
118      .instance_size = sizeof(IOThread),
119      .instance_finalize = iothread_instance_finalize,
120      .interfaces = (InterfaceInfo[]) {
121          {TYPE_USER_CREATABLE},
122          {}
123      },
124  };
125  
126  static void iothread_register_types(void)
127  {
128      type_register_static(&iothread_info);
129  }
130  
131  type_init(iothread_register_types)
132  
133  char *iothread_get_id(IOThread *iothread)
134  {
135      return object_get_canonical_path_component(OBJECT(iothread));
136  }
137  
138  AioContext *iothread_get_aio_context(IOThread *iothread)
139  {
140      return iothread->ctx;
141  }
142  
143  static int query_one_iothread(Object *object, void *opaque)
144  {
145      IOThreadInfoList ***prev = opaque;
146      IOThreadInfoList *elem;
147      IOThreadInfo *info;
148      IOThread *iothread;
149  
150      iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
151      if (!iothread) {
152          return 0;
153      }
154  
155      info = g_new0(IOThreadInfo, 1);
156      info->id = iothread_get_id(iothread);
157      info->thread_id = iothread->thread_id;
158  
159      elem = g_new0(IOThreadInfoList, 1);
160      elem->value = info;
161      elem->next = NULL;
162  
163      **prev = elem;
164      *prev = &elem->next;
165      return 0;
166  }
167  
168  IOThreadInfoList *qmp_query_iothreads(Error **errp)
169  {
170      IOThreadInfoList *head = NULL;
171      IOThreadInfoList **prev = &head;
172      Object *container = object_get_objects_root();
173  
174      object_child_foreach(container, query_one_iothread, &prev);
175      return head;
176  }
177