xref: /kvmtool/util/threadpool.c (revision 9449cd3c7b42845d671c0222e8aed82b4e9b723a)
1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 
4 #include <linux/kernel.h>
5 #include <linux/list.h>
6 #include <pthread.h>
7 #include <stdbool.h>
8 
9 struct thread_pool__job {
10 	kvm_thread_callback_fn_t	callback;
11 	struct kvm			*kvm;
12 	void				*data;
13 
14 	int				signalcount;
15 	pthread_mutex_t			mutex;
16 
17 	struct list_head		queue;
18 };
19 
20 static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
21 static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
22 static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
23 
24 static LIST_HEAD(head);
25 
26 static pthread_t	*threads;
27 static long		threadcount;
28 
29 static struct thread_pool__job *thread_pool__job_pop(void)
30 {
31 	struct thread_pool__job *job;
32 
33 	if (list_empty(&head))
34 		return NULL;
35 
36 	job = list_first_entry(&head, struct thread_pool__job, queue);
37 	list_del(&job->queue);
38 
39 	return job;
40 }
41 
42 static void thread_pool__job_push(struct thread_pool__job *job)
43 {
44 	list_add_tail(&job->queue, &head);
45 }
46 
47 static struct thread_pool__job *thread_pool__job_pop_locked(void)
48 {
49 	struct thread_pool__job *job;
50 
51 	mutex_lock(&job_mutex);
52 	job = thread_pool__job_pop();
53 	mutex_unlock(&job_mutex);
54 	return job;
55 }
56 
57 static void thread_pool__job_push_locked(struct thread_pool__job *job)
58 {
59 	mutex_lock(&job_mutex);
60 	thread_pool__job_push(job);
61 	mutex_unlock(&job_mutex);
62 }
63 
64 static void thread_pool__handle_job(struct thread_pool__job *job)
65 {
66 	while (job) {
67 		job->callback(job->kvm, job->data);
68 
69 		mutex_lock(&job->mutex);
70 
71 		if (--job->signalcount > 0)
72 			/* If the job was signaled again while we were working */
73 			thread_pool__job_push_locked(job);
74 
75 		mutex_unlock(&job->mutex);
76 
77 		job = thread_pool__job_pop_locked();
78 	}
79 }
80 
81 static void thread_pool__threadfunc_cleanup(void *param)
82 {
83 	mutex_unlock(&job_mutex);
84 }
85 
86 static void *thread_pool__threadfunc(void *param)
87 {
88 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
89 
90 	for (;;) {
91 		struct thread_pool__job *curjob;
92 
93 		mutex_lock(&job_mutex);
94 		pthread_cond_wait(&job_cond, &job_mutex);
95 		curjob = thread_pool__job_pop();
96 		mutex_unlock(&job_mutex);
97 
98 		if (curjob)
99 			thread_pool__handle_job(curjob);
100 	}
101 
102 	pthread_cleanup_pop(0);
103 
104 	return NULL;
105 }
106 
107 static int thread_pool__addthread(void)
108 {
109 	int res;
110 	void *newthreads;
111 
112 	mutex_lock(&thread_mutex);
113 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
114 	if (newthreads == NULL) {
115 		mutex_unlock(&thread_mutex);
116 		return -1;
117 	}
118 
119 	threads = newthreads;
120 
121 	res = pthread_create(threads + threadcount, NULL,
122 			     thread_pool__threadfunc, NULL);
123 
124 	if (res == 0)
125 		threadcount++;
126 	mutex_unlock(&thread_mutex);
127 
128 	return res;
129 }
130 
131 int thread_pool__init(unsigned long thread_count)
132 {
133 	unsigned long i;
134 
135 	for (i = 0; i < thread_count; i++)
136 		if (thread_pool__addthread() < 0)
137 			return i;
138 
139 	return i;
140 }
141 
142 void *thread_pool__add_job(struct kvm *kvm,
143 			       kvm_thread_callback_fn_t callback, void *data)
144 {
145 	struct thread_pool__job *job = calloc(1, sizeof(*job));
146 
147 	*job = (struct thread_pool__job) {
148 		.kvm		= kvm,
149 		.data		= data,
150 		.callback	= callback,
151 		.mutex		= PTHREAD_MUTEX_INITIALIZER
152 	};
153 
154 	return job;
155 }
156 
157 void thread_pool__do_job(void *job)
158 {
159 	struct thread_pool__job *jobinfo = job;
160 
161 	if (jobinfo == NULL)
162 		return;
163 
164 	mutex_lock(&jobinfo->mutex);
165 	if (jobinfo->signalcount++ == 0)
166 		thread_pool__job_push_locked(job);
167 	mutex_unlock(&jobinfo->mutex);
168 
169 	mutex_lock(&job_mutex);
170 	pthread_cond_signal(&job_cond);
171 	mutex_unlock(&job_mutex);
172 }
173