xref: /kvmtool/util/threadpool.c (revision 49a8afd1b9a4e503bdafb2bbc04549e03d514836)
1d60bafe5SSasha Levin #include "kvm/threadpool.h"
2d60bafe5SSasha Levin #include "kvm/mutex.h"
3*49a8afd1SSasha Levin #include "kvm/kvm.h"
4d60bafe5SSasha Levin 
5d60bafe5SSasha Levin #include <linux/kernel.h>
6d60bafe5SSasha Levin #include <linux/list.h>
7d60bafe5SSasha Levin #include <pthread.h>
8d60bafe5SSasha Levin #include <stdbool.h>
9d60bafe5SSasha Levin 
10d60bafe5SSasha Levin static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
11d60bafe5SSasha Levin static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
12d60bafe5SSasha Levin static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
13d60bafe5SSasha Levin 
14d60bafe5SSasha Levin static LIST_HEAD(head);
15d60bafe5SSasha Levin 
16d60bafe5SSasha Levin static pthread_t	*threads;
17d60bafe5SSasha Levin static long		threadcount;
18f6a3c571SSasha Levin static bool		running;
19d60bafe5SSasha Levin 
20a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void)
21d60bafe5SSasha Levin {
22f6a083e9SSasha Levin 	struct thread_pool__job *job;
23d60bafe5SSasha Levin 
24d60bafe5SSasha Levin 	if (list_empty(&head))
25d60bafe5SSasha Levin 		return NULL;
26d60bafe5SSasha Levin 
27f6a083e9SSasha Levin 	job = list_first_entry(&head, struct thread_pool__job, queue);
28d60bafe5SSasha Levin 	list_del(&job->queue);
29d60bafe5SSasha Levin 
30d60bafe5SSasha Levin 	return job;
31d60bafe5SSasha Levin }
32d60bafe5SSasha Levin 
33a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job)
34d60bafe5SSasha Levin {
35d60bafe5SSasha Levin 	list_add_tail(&job->queue, &head);
36d60bafe5SSasha Levin }
37d60bafe5SSasha Levin 
38a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void)
39d60bafe5SSasha Levin {
40f6a083e9SSasha Levin 	struct thread_pool__job *job;
41d60bafe5SSasha Levin 
42d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
43a4cae5abSLai Jiangshan 	job = thread_pool__job_pop_locked();
44d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
45d60bafe5SSasha Levin 	return job;
46d60bafe5SSasha Levin }
47d60bafe5SSasha Levin 
48a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job)
49d60bafe5SSasha Levin {
50d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
51a4cae5abSLai Jiangshan 	thread_pool__job_push_locked(job);
52d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
53d60bafe5SSasha Levin }
54d60bafe5SSasha Levin 
55f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job)
56d60bafe5SSasha Levin {
57d60bafe5SSasha Levin 	while (job) {
58d60bafe5SSasha Levin 		job->callback(job->kvm, job->data);
59d60bafe5SSasha Levin 
60d60bafe5SSasha Levin 		mutex_lock(&job->mutex);
61d60bafe5SSasha Levin 
62d60bafe5SSasha Levin 		if (--job->signalcount > 0)
63d60bafe5SSasha Levin 			/* If the job was signaled again while we were working */
64a4cae5abSLai Jiangshan 			thread_pool__job_push(job);
65d60bafe5SSasha Levin 
66d60bafe5SSasha Levin 		mutex_unlock(&job->mutex);
67d60bafe5SSasha Levin 
68a4cae5abSLai Jiangshan 		job = thread_pool__job_pop();
69d60bafe5SSasha Levin 	}
70d60bafe5SSasha Levin }
71d60bafe5SSasha Levin 
72d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param)
73d60bafe5SSasha Levin {
74d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
75d60bafe5SSasha Levin }
76d60bafe5SSasha Levin 
77d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param)
78d60bafe5SSasha Levin {
79d60bafe5SSasha Levin 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
80d60bafe5SSasha Levin 
81f6a3c571SSasha Levin 	while (running) {
82feceecd7SSasha Levin 		struct thread_pool__job *curjob = NULL;
83d60bafe5SSasha Levin 
84d60bafe5SSasha Levin 		mutex_lock(&job_mutex);
85f6a3c571SSasha Levin 		while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
86d60bafe5SSasha Levin 			pthread_cond_wait(&job_cond, &job_mutex);
87d60bafe5SSasha Levin 		mutex_unlock(&job_mutex);
88d60bafe5SSasha Levin 
89f6a3c571SSasha Levin 		if (running)
90d60bafe5SSasha Levin 			thread_pool__handle_job(curjob);
91d60bafe5SSasha Levin 	}
92d60bafe5SSasha Levin 
93d60bafe5SSasha Levin 	pthread_cleanup_pop(0);
94d60bafe5SSasha Levin 
95d60bafe5SSasha Levin 	return NULL;
96d60bafe5SSasha Levin }
97d60bafe5SSasha Levin 
98d60bafe5SSasha Levin static int thread_pool__addthread(void)
99d60bafe5SSasha Levin {
100d60bafe5SSasha Levin 	int res;
101d60bafe5SSasha Levin 	void *newthreads;
102d60bafe5SSasha Levin 
103d60bafe5SSasha Levin 	mutex_lock(&thread_mutex);
104d60bafe5SSasha Levin 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
105d60bafe5SSasha Levin 	if (newthreads == NULL) {
106d60bafe5SSasha Levin 		mutex_unlock(&thread_mutex);
107d60bafe5SSasha Levin 		return -1;
108d60bafe5SSasha Levin 	}
109d60bafe5SSasha Levin 
110d60bafe5SSasha Levin 	threads = newthreads;
111d60bafe5SSasha Levin 
112d60bafe5SSasha Levin 	res = pthread_create(threads + threadcount, NULL,
113d60bafe5SSasha Levin 			     thread_pool__threadfunc, NULL);
114d60bafe5SSasha Levin 
115d60bafe5SSasha Levin 	if (res == 0)
116d60bafe5SSasha Levin 		threadcount++;
117d60bafe5SSasha Levin 	mutex_unlock(&thread_mutex);
118d60bafe5SSasha Levin 
119d60bafe5SSasha Levin 	return res;
120d60bafe5SSasha Levin }
121d60bafe5SSasha Levin 
122f6a3c571SSasha Levin int thread_pool__init(struct kvm *kvm)
123d60bafe5SSasha Levin {
124d60bafe5SSasha Levin 	unsigned long i;
125f6a3c571SSasha Levin 	unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
126f6a3c571SSasha Levin 
127f6a3c571SSasha Levin 	running = true;
128d60bafe5SSasha Levin 
129d60bafe5SSasha Levin 	for (i = 0; i < thread_count; i++)
130d60bafe5SSasha Levin 		if (thread_pool__addthread() < 0)
131d60bafe5SSasha Levin 			return i;
132d60bafe5SSasha Levin 
133d60bafe5SSasha Levin 	return i;
134d60bafe5SSasha Levin }
135*49a8afd1SSasha Levin late_init(thread_pool__init);
136d60bafe5SSasha Levin 
137f6a3c571SSasha Levin int thread_pool__exit(struct kvm *kvm)
138f6a3c571SSasha Levin {
139f6a3c571SSasha Levin 	int i;
140f6a3c571SSasha Levin 	void *NUL = NULL;
141f6a3c571SSasha Levin 
142f6a3c571SSasha Levin 	running = false;
143f6a3c571SSasha Levin 
144f6a3c571SSasha Levin 	for (i = 0; i < threadcount; i++) {
145f6a3c571SSasha Levin 		mutex_lock(&job_mutex);
146f6a3c571SSasha Levin 		pthread_cond_signal(&job_cond);
147f6a3c571SSasha Levin 		mutex_unlock(&job_mutex);
148f6a3c571SSasha Levin 	}
149f6a3c571SSasha Levin 
150f6a3c571SSasha Levin 	for (i = 0; i < threadcount; i++) {
151f6a3c571SSasha Levin 		pthread_join(threads[i], NUL);
152f6a3c571SSasha Levin 	}
153f6a3c571SSasha Levin 
154f6a3c571SSasha Levin 	return 0;
155f6a3c571SSasha Levin }
156*49a8afd1SSasha Levin late_exit(thread_pool__exit);
157f6a3c571SSasha Levin 
158df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job)
159d60bafe5SSasha Levin {
160f6a083e9SSasha Levin 	struct thread_pool__job *jobinfo = job;
161d60bafe5SSasha Levin 
162df0c7f57SSasha Levin 	if (jobinfo == NULL || jobinfo->callback == NULL)
163d60bafe5SSasha Levin 		return;
164d60bafe5SSasha Levin 
165d60bafe5SSasha Levin 	mutex_lock(&jobinfo->mutex);
166d60bafe5SSasha Levin 	if (jobinfo->signalcount++ == 0)
167a4cae5abSLai Jiangshan 		thread_pool__job_push(job);
168d60bafe5SSasha Levin 	mutex_unlock(&jobinfo->mutex);
169d60bafe5SSasha Levin 
1709449cd3cSSasha Levin 	mutex_lock(&job_mutex);
171d60bafe5SSasha Levin 	pthread_cond_signal(&job_cond);
1729449cd3cSSasha Levin 	mutex_unlock(&job_mutex);
173d60bafe5SSasha Levin }
174