xref: /kvmtool/util/threadpool.c (revision d60bafe5d43fb00da8c7e0b3675351fae9a75670)
1*d60bafe5SSasha Levin #include "kvm/threadpool.h"
2*d60bafe5SSasha Levin #include "kvm/mutex.h"
3*d60bafe5SSasha Levin 
4*d60bafe5SSasha Levin #include <linux/kernel.h>
5*d60bafe5SSasha Levin #include <linux/list.h>
6*d60bafe5SSasha Levin #include <pthread.h>
7*d60bafe5SSasha Levin #include <stdbool.h>
8*d60bafe5SSasha Levin 
9*d60bafe5SSasha Levin struct thread_pool__job_info {
10*d60bafe5SSasha Levin 	kvm_thread_callback_fn_t	callback;
11*d60bafe5SSasha Levin 	struct kvm			*kvm;
12*d60bafe5SSasha Levin 	void				*data;
13*d60bafe5SSasha Levin 
14*d60bafe5SSasha Levin 	int				signalcount;
15*d60bafe5SSasha Levin 	pthread_mutex_t			mutex;
16*d60bafe5SSasha Levin 
17*d60bafe5SSasha Levin 	struct list_head		queue;
18*d60bafe5SSasha Levin };
19*d60bafe5SSasha Levin 
20*d60bafe5SSasha Levin static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
21*d60bafe5SSasha Levin static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
22*d60bafe5SSasha Levin static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
23*d60bafe5SSasha Levin 
24*d60bafe5SSasha Levin static LIST_HEAD(head);
25*d60bafe5SSasha Levin 
26*d60bafe5SSasha Levin static pthread_t	*threads;
27*d60bafe5SSasha Levin static long		threadcount;
28*d60bafe5SSasha Levin 
29*d60bafe5SSasha Levin static struct thread_pool__job_info *thread_pool__job_info_pop(void)
30*d60bafe5SSasha Levin {
31*d60bafe5SSasha Levin 	struct thread_pool__job_info *job;
32*d60bafe5SSasha Levin 
33*d60bafe5SSasha Levin 	if (list_empty(&head))
34*d60bafe5SSasha Levin 		return NULL;
35*d60bafe5SSasha Levin 
36*d60bafe5SSasha Levin 	job = list_first_entry(&head, struct thread_pool__job_info, queue);
37*d60bafe5SSasha Levin 	list_del(&job->queue);
38*d60bafe5SSasha Levin 
39*d60bafe5SSasha Levin 	return job;
40*d60bafe5SSasha Levin }
41*d60bafe5SSasha Levin 
42*d60bafe5SSasha Levin static void thread_pool__job_info_push(struct thread_pool__job_info *job)
43*d60bafe5SSasha Levin {
44*d60bafe5SSasha Levin 	list_add_tail(&job->queue, &head);
45*d60bafe5SSasha Levin }
46*d60bafe5SSasha Levin 
47*d60bafe5SSasha Levin static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void)
48*d60bafe5SSasha Levin {
49*d60bafe5SSasha Levin 	struct thread_pool__job_info *job;
50*d60bafe5SSasha Levin 
51*d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
52*d60bafe5SSasha Levin 	job = thread_pool__job_info_pop();
53*d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
54*d60bafe5SSasha Levin 	return job;
55*d60bafe5SSasha Levin }
56*d60bafe5SSasha Levin 
57*d60bafe5SSasha Levin static void thread_pool__job_info_push_locked(struct thread_pool__job_info *job)
58*d60bafe5SSasha Levin {
59*d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
60*d60bafe5SSasha Levin 	thread_pool__job_info_push(job);
61*d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
62*d60bafe5SSasha Levin }
63*d60bafe5SSasha Levin 
64*d60bafe5SSasha Levin static void thread_pool__handle_job(struct thread_pool__job_info *job)
65*d60bafe5SSasha Levin {
66*d60bafe5SSasha Levin 	while (job) {
67*d60bafe5SSasha Levin 		job->callback(job->kvm, job->data);
68*d60bafe5SSasha Levin 
69*d60bafe5SSasha Levin 		mutex_lock(&job->mutex);
70*d60bafe5SSasha Levin 
71*d60bafe5SSasha Levin 		if (--job->signalcount > 0)
72*d60bafe5SSasha Levin 			/* If the job was signaled again while we were working */
73*d60bafe5SSasha Levin 			thread_pool__job_info_push_locked(job);
74*d60bafe5SSasha Levin 
75*d60bafe5SSasha Levin 		mutex_unlock(&job->mutex);
76*d60bafe5SSasha Levin 
77*d60bafe5SSasha Levin 		job = thread_pool__job_info_pop_locked();
78*d60bafe5SSasha Levin 	}
79*d60bafe5SSasha Levin }
80*d60bafe5SSasha Levin 
81*d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param)
82*d60bafe5SSasha Levin {
83*d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
84*d60bafe5SSasha Levin }
85*d60bafe5SSasha Levin 
86*d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param)
87*d60bafe5SSasha Levin {
88*d60bafe5SSasha Levin 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
89*d60bafe5SSasha Levin 
90*d60bafe5SSasha Levin 	for (;;) {
91*d60bafe5SSasha Levin 		struct thread_pool__job_info *curjob;
92*d60bafe5SSasha Levin 
93*d60bafe5SSasha Levin 		mutex_lock(&job_mutex);
94*d60bafe5SSasha Levin 		pthread_cond_wait(&job_cond, &job_mutex);
95*d60bafe5SSasha Levin 		curjob = thread_pool__job_info_pop();
96*d60bafe5SSasha Levin 		mutex_unlock(&job_mutex);
97*d60bafe5SSasha Levin 
98*d60bafe5SSasha Levin 		if (curjob)
99*d60bafe5SSasha Levin 			thread_pool__handle_job(curjob);
100*d60bafe5SSasha Levin 	}
101*d60bafe5SSasha Levin 
102*d60bafe5SSasha Levin 	pthread_cleanup_pop(0);
103*d60bafe5SSasha Levin 
104*d60bafe5SSasha Levin 	return NULL;
105*d60bafe5SSasha Levin }
106*d60bafe5SSasha Levin 
107*d60bafe5SSasha Levin static int thread_pool__addthread(void)
108*d60bafe5SSasha Levin {
109*d60bafe5SSasha Levin 	int res;
110*d60bafe5SSasha Levin 	void *newthreads;
111*d60bafe5SSasha Levin 
112*d60bafe5SSasha Levin 	mutex_lock(&thread_mutex);
113*d60bafe5SSasha Levin 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
114*d60bafe5SSasha Levin 	if (newthreads == NULL) {
115*d60bafe5SSasha Levin 		mutex_unlock(&thread_mutex);
116*d60bafe5SSasha Levin 		return -1;
117*d60bafe5SSasha Levin 	}
118*d60bafe5SSasha Levin 
119*d60bafe5SSasha Levin 	threads = newthreads;
120*d60bafe5SSasha Levin 
121*d60bafe5SSasha Levin 	res = pthread_create(threads + threadcount, NULL,
122*d60bafe5SSasha Levin 			     thread_pool__threadfunc, NULL);
123*d60bafe5SSasha Levin 
124*d60bafe5SSasha Levin 	if (res == 0)
125*d60bafe5SSasha Levin 		threadcount++;
126*d60bafe5SSasha Levin 	mutex_unlock(&thread_mutex);
127*d60bafe5SSasha Levin 
128*d60bafe5SSasha Levin 	return res;
129*d60bafe5SSasha Levin }
130*d60bafe5SSasha Levin 
131*d60bafe5SSasha Levin int thread_pool__init(unsigned long thread_count)
132*d60bafe5SSasha Levin {
133*d60bafe5SSasha Levin 	unsigned long i;
134*d60bafe5SSasha Levin 
135*d60bafe5SSasha Levin 	for (i = 0; i < thread_count; i++)
136*d60bafe5SSasha Levin 		if (thread_pool__addthread() < 0)
137*d60bafe5SSasha Levin 			return i;
138*d60bafe5SSasha Levin 
139*d60bafe5SSasha Levin 	return i;
140*d60bafe5SSasha Levin }
141*d60bafe5SSasha Levin 
142*d60bafe5SSasha Levin void *thread_pool__add_jobtype(struct kvm *kvm,
143*d60bafe5SSasha Levin 			       kvm_thread_callback_fn_t callback, void *data)
144*d60bafe5SSasha Levin {
145*d60bafe5SSasha Levin 	struct thread_pool__job_info *job = calloc(1, sizeof(*job));
146*d60bafe5SSasha Levin 
147*d60bafe5SSasha Levin 	*job = (struct thread_pool__job_info) {
148*d60bafe5SSasha Levin 		.kvm		= kvm,
149*d60bafe5SSasha Levin 		.data		= data,
150*d60bafe5SSasha Levin 		.callback	= callback,
151*d60bafe5SSasha Levin 		.mutex		= PTHREAD_MUTEX_INITIALIZER
152*d60bafe5SSasha Levin 	};
153*d60bafe5SSasha Levin 
154*d60bafe5SSasha Levin 	return job;
155*d60bafe5SSasha Levin }
156*d60bafe5SSasha Levin 
157*d60bafe5SSasha Levin void thread_pool__signal_work(void *job)
158*d60bafe5SSasha Levin {
159*d60bafe5SSasha Levin 	struct thread_pool__job_info *jobinfo = job;
160*d60bafe5SSasha Levin 
161*d60bafe5SSasha Levin 	if (jobinfo == NULL)
162*d60bafe5SSasha Levin 		return;
163*d60bafe5SSasha Levin 
164*d60bafe5SSasha Levin 	mutex_lock(&jobinfo->mutex);
165*d60bafe5SSasha Levin 	if (jobinfo->signalcount++ == 0)
166*d60bafe5SSasha Levin 		thread_pool__job_info_push_locked(job);
167*d60bafe5SSasha Levin 	mutex_unlock(&jobinfo->mutex);
168*d60bafe5SSasha Levin 
169*d60bafe5SSasha Levin 	pthread_cond_signal(&job_cond);
170*d60bafe5SSasha Levin }
171