1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 #include "kvm/kvm.h"
4
5 #include <linux/kernel.h>
6 #include <linux/list.h>
7 #include <pthread.h>
8 #include <stdbool.h>
9
10 static DEFINE_MUTEX(job_mutex);
11 static DEFINE_MUTEX(thread_mutex);
12 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER;
13
14 static LIST_HEAD(head);
15
16 static pthread_t *threads;
17 static long threadcount;
18 static bool running;
19
thread_pool__job_pop_locked(void)20 static struct thread_pool__job *thread_pool__job_pop_locked(void)
21 {
22 struct thread_pool__job *job;
23
24 if (list_empty(&head))
25 return NULL;
26
27 job = list_first_entry(&head, struct thread_pool__job, queue);
28 list_del_init(&job->queue);
29
30 return job;
31 }
32
thread_pool__job_push_locked(struct thread_pool__job * job)33 static void thread_pool__job_push_locked(struct thread_pool__job *job)
34 {
35 list_add_tail(&job->queue, &head);
36 }
37
thread_pool__job_pop(void)38 static struct thread_pool__job *thread_pool__job_pop(void)
39 {
40 struct thread_pool__job *job;
41
42 mutex_lock(&job_mutex);
43 job = thread_pool__job_pop_locked();
44 mutex_unlock(&job_mutex);
45 return job;
46 }
47
thread_pool__job_push(struct thread_pool__job * job)48 static void thread_pool__job_push(struct thread_pool__job *job)
49 {
50 mutex_lock(&job_mutex);
51 thread_pool__job_push_locked(job);
52 mutex_unlock(&job_mutex);
53 }
54
thread_pool__handle_job(struct thread_pool__job * job)55 static void thread_pool__handle_job(struct thread_pool__job *job)
56 {
57 while (job) {
58 job->callback(job->kvm, job->data);
59
60 mutex_lock(&job->mutex);
61
62 if (--job->signalcount > 0)
63 /* If the job was signaled again while we were working */
64 thread_pool__job_push(job);
65
66 mutex_unlock(&job->mutex);
67
68 job = thread_pool__job_pop();
69 }
70 }
71
thread_pool__threadfunc_cleanup(void * param)72 static void thread_pool__threadfunc_cleanup(void *param)
73 {
74 mutex_unlock(&job_mutex);
75 }
76
thread_pool__threadfunc(void * param)77 static void *thread_pool__threadfunc(void *param)
78 {
79 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
80
81 kvm__set_thread_name("threadpool-worker");
82
83 while (running) {
84 struct thread_pool__job *curjob = NULL;
85
86 mutex_lock(&job_mutex);
87 while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
88 pthread_cond_wait(&job_cond, &job_mutex.mutex);
89 mutex_unlock(&job_mutex);
90
91 if (running)
92 thread_pool__handle_job(curjob);
93 }
94
95 pthread_cleanup_pop(0);
96
97 return NULL;
98 }
99
thread_pool__addthread(void)100 static int thread_pool__addthread(void)
101 {
102 int res;
103 void *newthreads;
104
105 mutex_lock(&thread_mutex);
106 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
107 if (newthreads == NULL) {
108 mutex_unlock(&thread_mutex);
109 return -1;
110 }
111
112 threads = newthreads;
113
114 res = pthread_create(threads + threadcount, NULL,
115 thread_pool__threadfunc, NULL);
116
117 if (res == 0)
118 threadcount++;
119 mutex_unlock(&thread_mutex);
120
121 return res;
122 }
123
thread_pool__init(struct kvm * kvm)124 int thread_pool__init(struct kvm *kvm)
125 {
126 unsigned long i;
127 unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
128
129 running = true;
130
131 for (i = 0; i < thread_count; i++)
132 if (thread_pool__addthread() < 0)
133 return i;
134
135 return i;
136 }
137 late_init(thread_pool__init);
138
thread_pool__exit(struct kvm * kvm)139 int thread_pool__exit(struct kvm *kvm)
140 {
141 int i;
142 void *NUL = NULL;
143
144 running = false;
145
146 for (i = 0; i < threadcount; i++) {
147 mutex_lock(&job_mutex);
148 pthread_cond_signal(&job_cond);
149 mutex_unlock(&job_mutex);
150 }
151
152 for (i = 0; i < threadcount; i++) {
153 pthread_join(threads[i], NUL);
154 }
155
156 return 0;
157 }
158 late_exit(thread_pool__exit);
159
thread_pool__do_job(struct thread_pool__job * job)160 void thread_pool__do_job(struct thread_pool__job *job)
161 {
162 struct thread_pool__job *jobinfo = job;
163
164 if (jobinfo == NULL || jobinfo->callback == NULL)
165 return;
166
167 mutex_lock(&jobinfo->mutex);
168 if (jobinfo->signalcount++ == 0)
169 thread_pool__job_push(job);
170 mutex_unlock(&jobinfo->mutex);
171
172 mutex_lock(&job_mutex);
173 pthread_cond_signal(&job_cond);
174 mutex_unlock(&job_mutex);
175 }
176
thread_pool__cancel_job(struct thread_pool__job * job)177 void thread_pool__cancel_job(struct thread_pool__job *job)
178 {
179 bool running;
180
181 /*
182 * If the job is queued but not running, remove it. Otherwise, wait for
183 * the signalcount to drop to 0, indicating that it has finished
184 * running. We assume that nobody is queueing this job -
185 * thread_pool__do_job() isn't called - while this function is running.
186 */
187 do {
188 mutex_lock(&job_mutex);
189 if (list_empty(&job->queue)) {
190 running = job->signalcount > 0;
191 } else {
192 list_del_init(&job->queue);
193 job->signalcount = 0;
194 running = false;
195 }
196 mutex_unlock(&job_mutex);
197 } while (running);
198 }
199