xref: /kvmtool/util/threadpool.c (revision f6a083e9252fd761b8510cd12382da325ace7344)
1d60bafe5SSasha Levin #include "kvm/threadpool.h"
2d60bafe5SSasha Levin #include "kvm/mutex.h"
3d60bafe5SSasha Levin 
4d60bafe5SSasha Levin #include <linux/kernel.h>
5d60bafe5SSasha Levin #include <linux/list.h>
6d60bafe5SSasha Levin #include <pthread.h>
7d60bafe5SSasha Levin #include <stdbool.h>
8d60bafe5SSasha Levin 
9*f6a083e9SSasha Levin struct thread_pool__job {
10d60bafe5SSasha Levin 	kvm_thread_callback_fn_t	callback;
11d60bafe5SSasha Levin 	struct kvm			*kvm;
12d60bafe5SSasha Levin 	void				*data;
13d60bafe5SSasha Levin 
14d60bafe5SSasha Levin 	int				signalcount;
15d60bafe5SSasha Levin 	pthread_mutex_t			mutex;
16d60bafe5SSasha Levin 
17d60bafe5SSasha Levin 	struct list_head		queue;
18d60bafe5SSasha Levin };
19d60bafe5SSasha Levin 
20d60bafe5SSasha Levin static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
21d60bafe5SSasha Levin static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
22d60bafe5SSasha Levin static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
23d60bafe5SSasha Levin 
24d60bafe5SSasha Levin static LIST_HEAD(head);
25d60bafe5SSasha Levin 
26d60bafe5SSasha Levin static pthread_t	*threads;
27d60bafe5SSasha Levin static long		threadcount;
28d60bafe5SSasha Levin 
29*f6a083e9SSasha Levin static struct thread_pool__job *thread_pool__job_pop(void)
30d60bafe5SSasha Levin {
31*f6a083e9SSasha Levin 	struct thread_pool__job *job;
32d60bafe5SSasha Levin 
33d60bafe5SSasha Levin 	if (list_empty(&head))
34d60bafe5SSasha Levin 		return NULL;
35d60bafe5SSasha Levin 
36*f6a083e9SSasha Levin 	job = list_first_entry(&head, struct thread_pool__job, queue);
37d60bafe5SSasha Levin 	list_del(&job->queue);
38d60bafe5SSasha Levin 
39d60bafe5SSasha Levin 	return job;
40d60bafe5SSasha Levin }
41d60bafe5SSasha Levin 
42*f6a083e9SSasha Levin static void thread_pool__job_push(struct thread_pool__job *job)
43d60bafe5SSasha Levin {
44d60bafe5SSasha Levin 	list_add_tail(&job->queue, &head);
45d60bafe5SSasha Levin }
46d60bafe5SSasha Levin 
47*f6a083e9SSasha Levin static struct thread_pool__job *thread_pool__job_pop_locked(void)
48d60bafe5SSasha Levin {
49*f6a083e9SSasha Levin 	struct thread_pool__job *job;
50d60bafe5SSasha Levin 
51d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
52*f6a083e9SSasha Levin 	job = thread_pool__job_pop();
53d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
54d60bafe5SSasha Levin 	return job;
55d60bafe5SSasha Levin }
56d60bafe5SSasha Levin 
57*f6a083e9SSasha Levin static void thread_pool__job_push_locked(struct thread_pool__job *job)
58d60bafe5SSasha Levin {
59d60bafe5SSasha Levin 	mutex_lock(&job_mutex);
60*f6a083e9SSasha Levin 	thread_pool__job_push(job);
61d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
62d60bafe5SSasha Levin }
63d60bafe5SSasha Levin 
64*f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job)
65d60bafe5SSasha Levin {
66d60bafe5SSasha Levin 	while (job) {
67d60bafe5SSasha Levin 		job->callback(job->kvm, job->data);
68d60bafe5SSasha Levin 
69d60bafe5SSasha Levin 		mutex_lock(&job->mutex);
70d60bafe5SSasha Levin 
71d60bafe5SSasha Levin 		if (--job->signalcount > 0)
72d60bafe5SSasha Levin 			/* If the job was signaled again while we were working */
73*f6a083e9SSasha Levin 			thread_pool__job_push_locked(job);
74d60bafe5SSasha Levin 
75d60bafe5SSasha Levin 		mutex_unlock(&job->mutex);
76d60bafe5SSasha Levin 
77*f6a083e9SSasha Levin 		job = thread_pool__job_pop_locked();
78d60bafe5SSasha Levin 	}
79d60bafe5SSasha Levin }
80d60bafe5SSasha Levin 
81d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param)
82d60bafe5SSasha Levin {
83d60bafe5SSasha Levin 	mutex_unlock(&job_mutex);
84d60bafe5SSasha Levin }
85d60bafe5SSasha Levin 
86d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param)
87d60bafe5SSasha Levin {
88d60bafe5SSasha Levin 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
89d60bafe5SSasha Levin 
90d60bafe5SSasha Levin 	for (;;) {
91*f6a083e9SSasha Levin 		struct thread_pool__job *curjob;
92d60bafe5SSasha Levin 
93d60bafe5SSasha Levin 		mutex_lock(&job_mutex);
94d60bafe5SSasha Levin 		pthread_cond_wait(&job_cond, &job_mutex);
95*f6a083e9SSasha Levin 		curjob = thread_pool__job_pop();
96d60bafe5SSasha Levin 		mutex_unlock(&job_mutex);
97d60bafe5SSasha Levin 
98d60bafe5SSasha Levin 		if (curjob)
99d60bafe5SSasha Levin 			thread_pool__handle_job(curjob);
100d60bafe5SSasha Levin 	}
101d60bafe5SSasha Levin 
102d60bafe5SSasha Levin 	pthread_cleanup_pop(0);
103d60bafe5SSasha Levin 
104d60bafe5SSasha Levin 	return NULL;
105d60bafe5SSasha Levin }
106d60bafe5SSasha Levin 
107d60bafe5SSasha Levin static int thread_pool__addthread(void)
108d60bafe5SSasha Levin {
109d60bafe5SSasha Levin 	int res;
110d60bafe5SSasha Levin 	void *newthreads;
111d60bafe5SSasha Levin 
112d60bafe5SSasha Levin 	mutex_lock(&thread_mutex);
113d60bafe5SSasha Levin 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
114d60bafe5SSasha Levin 	if (newthreads == NULL) {
115d60bafe5SSasha Levin 		mutex_unlock(&thread_mutex);
116d60bafe5SSasha Levin 		return -1;
117d60bafe5SSasha Levin 	}
118d60bafe5SSasha Levin 
119d60bafe5SSasha Levin 	threads = newthreads;
120d60bafe5SSasha Levin 
121d60bafe5SSasha Levin 	res = pthread_create(threads + threadcount, NULL,
122d60bafe5SSasha Levin 			     thread_pool__threadfunc, NULL);
123d60bafe5SSasha Levin 
124d60bafe5SSasha Levin 	if (res == 0)
125d60bafe5SSasha Levin 		threadcount++;
126d60bafe5SSasha Levin 	mutex_unlock(&thread_mutex);
127d60bafe5SSasha Levin 
128d60bafe5SSasha Levin 	return res;
129d60bafe5SSasha Levin }
130d60bafe5SSasha Levin 
131d60bafe5SSasha Levin int thread_pool__init(unsigned long thread_count)
132d60bafe5SSasha Levin {
133d60bafe5SSasha Levin 	unsigned long i;
134d60bafe5SSasha Levin 
135d60bafe5SSasha Levin 	for (i = 0; i < thread_count; i++)
136d60bafe5SSasha Levin 		if (thread_pool__addthread() < 0)
137d60bafe5SSasha Levin 			return i;
138d60bafe5SSasha Levin 
139d60bafe5SSasha Levin 	return i;
140d60bafe5SSasha Levin }
141d60bafe5SSasha Levin 
142*f6a083e9SSasha Levin void *thread_pool__add_job(struct kvm *kvm,
143d60bafe5SSasha Levin 			       kvm_thread_callback_fn_t callback, void *data)
144d60bafe5SSasha Levin {
145*f6a083e9SSasha Levin 	struct thread_pool__job *job = calloc(1, sizeof(*job));
146d60bafe5SSasha Levin 
147*f6a083e9SSasha Levin 	*job = (struct thread_pool__job) {
148d60bafe5SSasha Levin 		.kvm		= kvm,
149d60bafe5SSasha Levin 		.data		= data,
150d60bafe5SSasha Levin 		.callback	= callback,
151d60bafe5SSasha Levin 		.mutex		= PTHREAD_MUTEX_INITIALIZER
152d60bafe5SSasha Levin 	};
153d60bafe5SSasha Levin 
154d60bafe5SSasha Levin 	return job;
155d60bafe5SSasha Levin }
156d60bafe5SSasha Levin 
157*f6a083e9SSasha Levin void thread_pool__do_job(void *job)
158d60bafe5SSasha Levin {
159*f6a083e9SSasha Levin 	struct thread_pool__job *jobinfo = job;
160d60bafe5SSasha Levin 
161d60bafe5SSasha Levin 	if (jobinfo == NULL)
162d60bafe5SSasha Levin 		return;
163d60bafe5SSasha Levin 
164d60bafe5SSasha Levin 	mutex_lock(&jobinfo->mutex);
165d60bafe5SSasha Levin 	if (jobinfo->signalcount++ == 0)
166*f6a083e9SSasha Levin 		thread_pool__job_push_locked(job);
167d60bafe5SSasha Levin 	mutex_unlock(&jobinfo->mutex);
168d60bafe5SSasha Levin 
169d60bafe5SSasha Levin 	pthread_cond_signal(&job_cond);
170d60bafe5SSasha Levin }
171