xref: /kvmtool/util/threadpool.c (revision 49a8afd1b9a4e503bdafb2bbc04549e03d514836)
1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 #include "kvm/kvm.h"
4 
5 #include <linux/kernel.h>
6 #include <linux/list.h>
7 #include <pthread.h>
8 #include <stdbool.h>
9 
10 static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
11 static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
12 static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
13 
14 static LIST_HEAD(head);
15 
16 static pthread_t	*threads;
17 static long		threadcount;
18 static bool		running;
19 
20 static struct thread_pool__job *thread_pool__job_pop_locked(void)
21 {
22 	struct thread_pool__job *job;
23 
24 	if (list_empty(&head))
25 		return NULL;
26 
27 	job = list_first_entry(&head, struct thread_pool__job, queue);
28 	list_del(&job->queue);
29 
30 	return job;
31 }
32 
33 static void thread_pool__job_push_locked(struct thread_pool__job *job)
34 {
35 	list_add_tail(&job->queue, &head);
36 }
37 
38 static struct thread_pool__job *thread_pool__job_pop(void)
39 {
40 	struct thread_pool__job *job;
41 
42 	mutex_lock(&job_mutex);
43 	job = thread_pool__job_pop_locked();
44 	mutex_unlock(&job_mutex);
45 	return job;
46 }
47 
48 static void thread_pool__job_push(struct thread_pool__job *job)
49 {
50 	mutex_lock(&job_mutex);
51 	thread_pool__job_push_locked(job);
52 	mutex_unlock(&job_mutex);
53 }
54 
55 static void thread_pool__handle_job(struct thread_pool__job *job)
56 {
57 	while (job) {
58 		job->callback(job->kvm, job->data);
59 
60 		mutex_lock(&job->mutex);
61 
62 		if (--job->signalcount > 0)
63 			/* If the job was signaled again while we were working */
64 			thread_pool__job_push(job);
65 
66 		mutex_unlock(&job->mutex);
67 
68 		job = thread_pool__job_pop();
69 	}
70 }
71 
72 static void thread_pool__threadfunc_cleanup(void *param)
73 {
74 	mutex_unlock(&job_mutex);
75 }
76 
77 static void *thread_pool__threadfunc(void *param)
78 {
79 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
80 
81 	while (running) {
82 		struct thread_pool__job *curjob = NULL;
83 
84 		mutex_lock(&job_mutex);
85 		while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
86 			pthread_cond_wait(&job_cond, &job_mutex);
87 		mutex_unlock(&job_mutex);
88 
89 		if (running)
90 			thread_pool__handle_job(curjob);
91 	}
92 
93 	pthread_cleanup_pop(0);
94 
95 	return NULL;
96 }
97 
98 static int thread_pool__addthread(void)
99 {
100 	int res;
101 	void *newthreads;
102 
103 	mutex_lock(&thread_mutex);
104 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
105 	if (newthreads == NULL) {
106 		mutex_unlock(&thread_mutex);
107 		return -1;
108 	}
109 
110 	threads = newthreads;
111 
112 	res = pthread_create(threads + threadcount, NULL,
113 			     thread_pool__threadfunc, NULL);
114 
115 	if (res == 0)
116 		threadcount++;
117 	mutex_unlock(&thread_mutex);
118 
119 	return res;
120 }
121 
122 int thread_pool__init(struct kvm *kvm)
123 {
124 	unsigned long i;
125 	unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
126 
127 	running = true;
128 
129 	for (i = 0; i < thread_count; i++)
130 		if (thread_pool__addthread() < 0)
131 			return i;
132 
133 	return i;
134 }
135 late_init(thread_pool__init);
136 
137 int thread_pool__exit(struct kvm *kvm)
138 {
139 	int i;
140 	void *NUL = NULL;
141 
142 	running = false;
143 
144 	for (i = 0; i < threadcount; i++) {
145 		mutex_lock(&job_mutex);
146 		pthread_cond_signal(&job_cond);
147 		mutex_unlock(&job_mutex);
148 	}
149 
150 	for (i = 0; i < threadcount; i++) {
151 		pthread_join(threads[i], NUL);
152 	}
153 
154 	return 0;
155 }
156 late_exit(thread_pool__exit);
157 
158 void thread_pool__do_job(struct thread_pool__job *job)
159 {
160 	struct thread_pool__job *jobinfo = job;
161 
162 	if (jobinfo == NULL || jobinfo->callback == NULL)
163 		return;
164 
165 	mutex_lock(&jobinfo->mutex);
166 	if (jobinfo->signalcount++ == 0)
167 		thread_pool__job_push(job);
168 	mutex_unlock(&jobinfo->mutex);
169 
170 	mutex_lock(&job_mutex);
171 	pthread_cond_signal(&job_cond);
172 	mutex_unlock(&job_mutex);
173 }
174