1d60bafe5SSasha Levin #include "kvm/threadpool.h"
2d60bafe5SSasha Levin #include "kvm/mutex.h"
349a8afd1SSasha Levin #include "kvm/kvm.h"
4d60bafe5SSasha Levin
5d60bafe5SSasha Levin #include <linux/kernel.h>
6d60bafe5SSasha Levin #include <linux/list.h>
7d60bafe5SSasha Levin #include <pthread.h>
8d60bafe5SSasha Levin #include <stdbool.h>
9d60bafe5SSasha Levin
10d3476f7dSSasha Levin static DEFINE_MUTEX(job_mutex);
11d3476f7dSSasha Levin static DEFINE_MUTEX(thread_mutex);
12d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER;
13d60bafe5SSasha Levin
14d60bafe5SSasha Levin static LIST_HEAD(head);
15d60bafe5SSasha Levin
16d60bafe5SSasha Levin static pthread_t *threads;
17d60bafe5SSasha Levin static long threadcount;
18f6a3c571SSasha Levin static bool running;
19d60bafe5SSasha Levin
thread_pool__job_pop_locked(void)20a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void)
21d60bafe5SSasha Levin {
22f6a083e9SSasha Levin struct thread_pool__job *job;
23d60bafe5SSasha Levin
24d60bafe5SSasha Levin if (list_empty(&head))
25d60bafe5SSasha Levin return NULL;
26d60bafe5SSasha Levin
27f6a083e9SSasha Levin job = list_first_entry(&head, struct thread_pool__job, queue);
28*7a7f4542SJean-Philippe Brucker list_del_init(&job->queue);
29d60bafe5SSasha Levin
30d60bafe5SSasha Levin return job;
31d60bafe5SSasha Levin }
32d60bafe5SSasha Levin
thread_pool__job_push_locked(struct thread_pool__job * job)33a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job)
34d60bafe5SSasha Levin {
35d60bafe5SSasha Levin list_add_tail(&job->queue, &head);
36d60bafe5SSasha Levin }
37d60bafe5SSasha Levin
thread_pool__job_pop(void)38a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void)
39d60bafe5SSasha Levin {
40f6a083e9SSasha Levin struct thread_pool__job *job;
41d60bafe5SSasha Levin
42d60bafe5SSasha Levin mutex_lock(&job_mutex);
43a4cae5abSLai Jiangshan job = thread_pool__job_pop_locked();
44d60bafe5SSasha Levin mutex_unlock(&job_mutex);
45d60bafe5SSasha Levin return job;
46d60bafe5SSasha Levin }
47d60bafe5SSasha Levin
thread_pool__job_push(struct thread_pool__job * job)48a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job)
49d60bafe5SSasha Levin {
50d60bafe5SSasha Levin mutex_lock(&job_mutex);
51a4cae5abSLai Jiangshan thread_pool__job_push_locked(job);
52d60bafe5SSasha Levin mutex_unlock(&job_mutex);
53d60bafe5SSasha Levin }
54d60bafe5SSasha Levin
thread_pool__handle_job(struct thread_pool__job * job)55f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job)
56d60bafe5SSasha Levin {
57d60bafe5SSasha Levin while (job) {
58d60bafe5SSasha Levin job->callback(job->kvm, job->data);
59d60bafe5SSasha Levin
60d60bafe5SSasha Levin mutex_lock(&job->mutex);
61d60bafe5SSasha Levin
62d60bafe5SSasha Levin if (--job->signalcount > 0)
63d60bafe5SSasha Levin /* If the job was signaled again while we were working */
64a4cae5abSLai Jiangshan thread_pool__job_push(job);
65d60bafe5SSasha Levin
66d60bafe5SSasha Levin mutex_unlock(&job->mutex);
67d60bafe5SSasha Levin
68a4cae5abSLai Jiangshan job = thread_pool__job_pop();
69d60bafe5SSasha Levin }
70d60bafe5SSasha Levin }
71d60bafe5SSasha Levin
thread_pool__threadfunc_cleanup(void * param)72d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param)
73d60bafe5SSasha Levin {
74d60bafe5SSasha Levin mutex_unlock(&job_mutex);
75d60bafe5SSasha Levin }
76d60bafe5SSasha Levin
thread_pool__threadfunc(void * param)77d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param)
78d60bafe5SSasha Levin {
79d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
80d60bafe5SSasha Levin
81a4d8c55eSSasha Levin kvm__set_thread_name("threadpool-worker");
82a4d8c55eSSasha Levin
83f6a3c571SSasha Levin while (running) {
84feceecd7SSasha Levin struct thread_pool__job *curjob = NULL;
85d60bafe5SSasha Levin
86d60bafe5SSasha Levin mutex_lock(&job_mutex);
87f6a3c571SSasha Levin while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
88d3476f7dSSasha Levin pthread_cond_wait(&job_cond, &job_mutex.mutex);
89d60bafe5SSasha Levin mutex_unlock(&job_mutex);
90d60bafe5SSasha Levin
91f6a3c571SSasha Levin if (running)
92d60bafe5SSasha Levin thread_pool__handle_job(curjob);
93d60bafe5SSasha Levin }
94d60bafe5SSasha Levin
95d60bafe5SSasha Levin pthread_cleanup_pop(0);
96d60bafe5SSasha Levin
97d60bafe5SSasha Levin return NULL;
98d60bafe5SSasha Levin }
99d60bafe5SSasha Levin
thread_pool__addthread(void)100d60bafe5SSasha Levin static int thread_pool__addthread(void)
101d60bafe5SSasha Levin {
102d60bafe5SSasha Levin int res;
103d60bafe5SSasha Levin void *newthreads;
104d60bafe5SSasha Levin
105d60bafe5SSasha Levin mutex_lock(&thread_mutex);
106d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
107d60bafe5SSasha Levin if (newthreads == NULL) {
108d60bafe5SSasha Levin mutex_unlock(&thread_mutex);
109d60bafe5SSasha Levin return -1;
110d60bafe5SSasha Levin }
111d60bafe5SSasha Levin
112d60bafe5SSasha Levin threads = newthreads;
113d60bafe5SSasha Levin
114d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL,
115d60bafe5SSasha Levin thread_pool__threadfunc, NULL);
116d60bafe5SSasha Levin
117d60bafe5SSasha Levin if (res == 0)
118d60bafe5SSasha Levin threadcount++;
119d60bafe5SSasha Levin mutex_unlock(&thread_mutex);
120d60bafe5SSasha Levin
121d60bafe5SSasha Levin return res;
122d60bafe5SSasha Levin }
123d60bafe5SSasha Levin
thread_pool__init(struct kvm * kvm)124f6a3c571SSasha Levin int thread_pool__init(struct kvm *kvm)
125d60bafe5SSasha Levin {
126d60bafe5SSasha Levin unsigned long i;
127f6a3c571SSasha Levin unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
128f6a3c571SSasha Levin
129f6a3c571SSasha Levin running = true;
130d60bafe5SSasha Levin
131d60bafe5SSasha Levin for (i = 0; i < thread_count; i++)
132d60bafe5SSasha Levin if (thread_pool__addthread() < 0)
133d60bafe5SSasha Levin return i;
134d60bafe5SSasha Levin
135d60bafe5SSasha Levin return i;
136d60bafe5SSasha Levin }
13749a8afd1SSasha Levin late_init(thread_pool__init);
138d60bafe5SSasha Levin
thread_pool__exit(struct kvm * kvm)139f6a3c571SSasha Levin int thread_pool__exit(struct kvm *kvm)
140f6a3c571SSasha Levin {
141f6a3c571SSasha Levin int i;
142f6a3c571SSasha Levin void *NUL = NULL;
143f6a3c571SSasha Levin
144f6a3c571SSasha Levin running = false;
145f6a3c571SSasha Levin
146f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) {
147f6a3c571SSasha Levin mutex_lock(&job_mutex);
148f6a3c571SSasha Levin pthread_cond_signal(&job_cond);
149f6a3c571SSasha Levin mutex_unlock(&job_mutex);
150f6a3c571SSasha Levin }
151f6a3c571SSasha Levin
152f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) {
153f6a3c571SSasha Levin pthread_join(threads[i], NUL);
154f6a3c571SSasha Levin }
155f6a3c571SSasha Levin
156f6a3c571SSasha Levin return 0;
157f6a3c571SSasha Levin }
15849a8afd1SSasha Levin late_exit(thread_pool__exit);
159f6a3c571SSasha Levin
thread_pool__do_job(struct thread_pool__job * job)160df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job)
161d60bafe5SSasha Levin {
162f6a083e9SSasha Levin struct thread_pool__job *jobinfo = job;
163d60bafe5SSasha Levin
164df0c7f57SSasha Levin if (jobinfo == NULL || jobinfo->callback == NULL)
165d60bafe5SSasha Levin return;
166d60bafe5SSasha Levin
167d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex);
168d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0)
169a4cae5abSLai Jiangshan thread_pool__job_push(job);
170d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex);
171d60bafe5SSasha Levin
1729449cd3cSSasha Levin mutex_lock(&job_mutex);
173d60bafe5SSasha Levin pthread_cond_signal(&job_cond);
1749449cd3cSSasha Levin mutex_unlock(&job_mutex);
175d60bafe5SSasha Levin }
176*7a7f4542SJean-Philippe Brucker
thread_pool__cancel_job(struct thread_pool__job * job)177*7a7f4542SJean-Philippe Brucker void thread_pool__cancel_job(struct thread_pool__job *job)
178*7a7f4542SJean-Philippe Brucker {
179*7a7f4542SJean-Philippe Brucker bool running;
180*7a7f4542SJean-Philippe Brucker
181*7a7f4542SJean-Philippe Brucker /*
182*7a7f4542SJean-Philippe Brucker * If the job is queued but not running, remove it. Otherwise, wait for
183*7a7f4542SJean-Philippe Brucker * the signalcount to drop to 0, indicating that it has finished
184*7a7f4542SJean-Philippe Brucker * running. We assume that nobody is queueing this job -
185*7a7f4542SJean-Philippe Brucker * thread_pool__do_job() isn't called - while this function is running.
186*7a7f4542SJean-Philippe Brucker */
187*7a7f4542SJean-Philippe Brucker do {
188*7a7f4542SJean-Philippe Brucker mutex_lock(&job_mutex);
189*7a7f4542SJean-Philippe Brucker if (list_empty(&job->queue)) {
190*7a7f4542SJean-Philippe Brucker running = job->signalcount > 0;
191*7a7f4542SJean-Philippe Brucker } else {
192*7a7f4542SJean-Philippe Brucker list_del_init(&job->queue);
193*7a7f4542SJean-Philippe Brucker job->signalcount = 0;
194*7a7f4542SJean-Philippe Brucker running = false;
195*7a7f4542SJean-Philippe Brucker }
196*7a7f4542SJean-Philippe Brucker mutex_unlock(&job_mutex);
197*7a7f4542SJean-Philippe Brucker } while (running);
198*7a7f4542SJean-Philippe Brucker }
199