xref: /kvmtool/util/threadpool.c (revision a4cae5aba58cc7ad541c885ea48fcc93434e0a16)
1d60bafe5SSasha Levin #include "kvm/threadpool.h"
2d60bafe5SSasha Levin #include "kvm/mutex.h"
3d60bafe5SSasha Levin 
4d60bafe5SSasha Levin #include <linux/kernel.h>
5d60bafe5SSasha Levin #include <linux/list.h>
6d60bafe5SSasha Levin #include <pthread.h>
7d60bafe5SSasha Levin #include <stdbool.h>
8d60bafe5SSasha Levin 
9d60bafe5SSasha Levin static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
10d60bafe5SSasha Levin static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
11d60bafe5SSasha Levin static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
12d60bafe5SSasha Levin 
13d60bafe5SSasha Levin static LIST_HEAD(head);
14d60bafe5SSasha Levin 
15d60bafe5SSasha Levin static pthread_t	*threads;
16d60bafe5SSasha Levin static long		threadcount;
17d60bafe5SSasha Levin 
18*a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void)
19d60bafe5SSasha Levin {
20f6a083e9SSasha Levin 	struct thread_pool__job *job;
21d60bafe5SSasha Levin 
22d60bafe5SSasha Levin 	if (list_empty(&head))
23d60bafe5SSasha Levin 		return NULL;
24d60bafe5SSasha Levin 
25f6a083e9SSasha Levin 	job = list_first_entry(&head, struct thread_pool__job, queue);
26d60bafe5SSasha Levin 	list_del(&job->queue);
27d60bafe5SSasha Levin 
28d60bafe5SSasha Levin 	return job;
29d60bafe5SSasha Levin }
30d60bafe5SSasha Levin 
31*a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job)
32d60bafe5SSasha Levin {
33d60bafe5SSasha Levin 	list_add_tail(&job->queue, &head);
34d60bafe5SSasha Levin }
35d60bafe5SSasha Levin 
36*a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void)
37d60bafe5SSasha Levin {
38f6a083e9SSasha Levin 	struct thread_pool__job *job;
39d60bafe5SSasha Levin 
40d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
41*a4cae5abSLai Jiangshan 	job = thread_pool__job_pop_locked();
42d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
43d60bafe5SSasha Levin 	return job;
44d60bafe5SSasha Levin }
45d60bafe5SSasha Levin 
46*a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job)
47d60bafe5SSasha Levin {
48d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
49*a4cae5abSLai Jiangshan 	thread_pool__job_push_locked(job);
50d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
51d60bafe5SSasha Levin }
52d60bafe5SSasha Levin 
53f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job)
54d60bafe5SSasha Levin {
55d60bafe5SSasha Levin 	while (job) {
56d60bafe5SSasha Levin 		job->callback(job->kvm, job->data);
57d60bafe5SSasha Levin 
58d60bafe5SSasha Levin 		mutex_lock(&job->mutex);
59d60bafe5SSasha Levin 
60d60bafe5SSasha Levin 		if (--job->signalcount > 0)
61d60bafe5SSasha Levin 			/* If the job was signaled again while we were working */
62*a4cae5abSLai Jiangshan 			thread_pool__job_push(job);
63d60bafe5SSasha Levin 
64d60bafe5SSasha Levin 		mutex_unlock(&job->mutex);
65d60bafe5SSasha Levin 
66*a4cae5abSLai Jiangshan 		job = thread_pool__job_pop();
67d60bafe5SSasha Levin 	}
68d60bafe5SSasha Levin }
69d60bafe5SSasha Levin 
70d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param)
71d60bafe5SSasha Levin {
72d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
73d60bafe5SSasha Levin }
74d60bafe5SSasha Levin 
75d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param)
76d60bafe5SSasha Levin {
77d60bafe5SSasha Levin 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
78d60bafe5SSasha Levin 
79d60bafe5SSasha Levin 	for (;;) {
80f6a083e9SSasha Levin 		struct thread_pool__job *curjob;
81d60bafe5SSasha Levin 
82d60bafe5SSasha Levin 		mutex_lock(&job_mutex);
83*a4cae5abSLai Jiangshan 		while ((curjob = thread_pool__job_pop_locked()) == NULL)
84d60bafe5SSasha Levin 			pthread_cond_wait(&job_cond, &job_mutex);
85d60bafe5SSasha Levin 		mutex_unlock(&job_mutex);
86d60bafe5SSasha Levin 
87d60bafe5SSasha Levin 		thread_pool__handle_job(curjob);
88d60bafe5SSasha Levin 	}
89d60bafe5SSasha Levin 
90d60bafe5SSasha Levin 	pthread_cleanup_pop(0);
91d60bafe5SSasha Levin 
92d60bafe5SSasha Levin 	return NULL;
93d60bafe5SSasha Levin }
94d60bafe5SSasha Levin 
95d60bafe5SSasha Levin static int thread_pool__addthread(void)
96d60bafe5SSasha Levin {
97d60bafe5SSasha Levin 	int res;
98d60bafe5SSasha Levin 	void *newthreads;
99d60bafe5SSasha Levin 
100d60bafe5SSasha Levin 	mutex_lock(&thread_mutex);
101d60bafe5SSasha Levin 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
102d60bafe5SSasha Levin 	if (newthreads == NULL) {
103d60bafe5SSasha Levin 		mutex_unlock(&thread_mutex);
104d60bafe5SSasha Levin 		return -1;
105d60bafe5SSasha Levin 	}
106d60bafe5SSasha Levin 
107d60bafe5SSasha Levin 	threads = newthreads;
108d60bafe5SSasha Levin 
109d60bafe5SSasha Levin 	res = pthread_create(threads + threadcount, NULL,
110d60bafe5SSasha Levin 			     thread_pool__threadfunc, NULL);
111d60bafe5SSasha Levin 
112d60bafe5SSasha Levin 	if (res == 0)
113d60bafe5SSasha Levin 		threadcount++;
114d60bafe5SSasha Levin 	mutex_unlock(&thread_mutex);
115d60bafe5SSasha Levin 
116d60bafe5SSasha Levin 	return res;
117d60bafe5SSasha Levin }
118d60bafe5SSasha Levin 
119d60bafe5SSasha Levin int thread_pool__init(unsigned long thread_count)
120d60bafe5SSasha Levin {
121d60bafe5SSasha Levin 	unsigned long i;
122d60bafe5SSasha Levin 
123d60bafe5SSasha Levin 	for (i = 0; i < thread_count; i++)
124d60bafe5SSasha Levin 		if (thread_pool__addthread() < 0)
125d60bafe5SSasha Levin 			return i;
126d60bafe5SSasha Levin 
127d60bafe5SSasha Levin 	return i;
128d60bafe5SSasha Levin }
129d60bafe5SSasha Levin 
130df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job)
131d60bafe5SSasha Levin {
132f6a083e9SSasha Levin 	struct thread_pool__job *jobinfo = job;
133d60bafe5SSasha Levin 
134df0c7f57SSasha Levin 	if (jobinfo == NULL || jobinfo->callback == NULL)
135d60bafe5SSasha Levin 		return;
136d60bafe5SSasha Levin 
137d60bafe5SSasha Levin 	mutex_lock(&jobinfo->mutex);
138d60bafe5SSasha Levin 	if (jobinfo->signalcount++ == 0)
139*a4cae5abSLai Jiangshan 		thread_pool__job_push(job);
140d60bafe5SSasha Levin 	mutex_unlock(&jobinfo->mutex);
141d60bafe5SSasha Levin 
1429449cd3cSSasha Levin 	mutex_lock(&job_mutex);
143d60bafe5SSasha Levin 	pthread_cond_signal(&job_cond);
1449449cd3cSSasha Levin 	mutex_unlock(&job_mutex);
145d60bafe5SSasha Levin }
146