xref: /kvmtool/util/threadpool.c (revision 7a7f45421f3a04e25d274c207ddd110af69da1bb)
1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 #include "kvm/kvm.h"
4 
5 #include <linux/kernel.h>
6 #include <linux/list.h>
7 #include <pthread.h>
8 #include <stdbool.h>
9 
10 static DEFINE_MUTEX(job_mutex);
11 static DEFINE_MUTEX(thread_mutex);
12 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER;
13 
14 static LIST_HEAD(head);
15 
16 static pthread_t	*threads;
17 static long		threadcount;
18 static bool		running;
19 
thread_pool__job_pop_locked(void)20 static struct thread_pool__job *thread_pool__job_pop_locked(void)
21 {
22 	struct thread_pool__job *job;
23 
24 	if (list_empty(&head))
25 		return NULL;
26 
27 	job = list_first_entry(&head, struct thread_pool__job, queue);
28 	list_del_init(&job->queue);
29 
30 	return job;
31 }
32 
thread_pool__job_push_locked(struct thread_pool__job * job)33 static void thread_pool__job_push_locked(struct thread_pool__job *job)
34 {
35 	list_add_tail(&job->queue, &head);
36 }
37 
thread_pool__job_pop(void)38 static struct thread_pool__job *thread_pool__job_pop(void)
39 {
40 	struct thread_pool__job *job;
41 
42 	mutex_lock(&job_mutex);
43 	job = thread_pool__job_pop_locked();
44 	mutex_unlock(&job_mutex);
45 	return job;
46 }
47 
thread_pool__job_push(struct thread_pool__job * job)48 static void thread_pool__job_push(struct thread_pool__job *job)
49 {
50 	mutex_lock(&job_mutex);
51 	thread_pool__job_push_locked(job);
52 	mutex_unlock(&job_mutex);
53 }
54 
thread_pool__handle_job(struct thread_pool__job * job)55 static void thread_pool__handle_job(struct thread_pool__job *job)
56 {
57 	while (job) {
58 		job->callback(job->kvm, job->data);
59 
60 		mutex_lock(&job->mutex);
61 
62 		if (--job->signalcount > 0)
63 			/* If the job was signaled again while we were working */
64 			thread_pool__job_push(job);
65 
66 		mutex_unlock(&job->mutex);
67 
68 		job = thread_pool__job_pop();
69 	}
70 }
71 
thread_pool__threadfunc_cleanup(void * param)72 static void thread_pool__threadfunc_cleanup(void *param)
73 {
74 	mutex_unlock(&job_mutex);
75 }
76 
thread_pool__threadfunc(void * param)77 static void *thread_pool__threadfunc(void *param)
78 {
79 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
80 
81 	kvm__set_thread_name("threadpool-worker");
82 
83 	while (running) {
84 		struct thread_pool__job *curjob = NULL;
85 
86 		mutex_lock(&job_mutex);
87 		while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
88 			pthread_cond_wait(&job_cond, &job_mutex.mutex);
89 		mutex_unlock(&job_mutex);
90 
91 		if (running)
92 			thread_pool__handle_job(curjob);
93 	}
94 
95 	pthread_cleanup_pop(0);
96 
97 	return NULL;
98 }
99 
thread_pool__addthread(void)100 static int thread_pool__addthread(void)
101 {
102 	int res;
103 	void *newthreads;
104 
105 	mutex_lock(&thread_mutex);
106 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
107 	if (newthreads == NULL) {
108 		mutex_unlock(&thread_mutex);
109 		return -1;
110 	}
111 
112 	threads = newthreads;
113 
114 	res = pthread_create(threads + threadcount, NULL,
115 			     thread_pool__threadfunc, NULL);
116 
117 	if (res == 0)
118 		threadcount++;
119 	mutex_unlock(&thread_mutex);
120 
121 	return res;
122 }
123 
thread_pool__init(struct kvm * kvm)124 int thread_pool__init(struct kvm *kvm)
125 {
126 	unsigned long i;
127 	unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
128 
129 	running = true;
130 
131 	for (i = 0; i < thread_count; i++)
132 		if (thread_pool__addthread() < 0)
133 			return i;
134 
135 	return i;
136 }
137 late_init(thread_pool__init);
138 
thread_pool__exit(struct kvm * kvm)139 int thread_pool__exit(struct kvm *kvm)
140 {
141 	int i;
142 	void *NUL = NULL;
143 
144 	running = false;
145 
146 	for (i = 0; i < threadcount; i++) {
147 		mutex_lock(&job_mutex);
148 		pthread_cond_signal(&job_cond);
149 		mutex_unlock(&job_mutex);
150 	}
151 
152 	for (i = 0; i < threadcount; i++) {
153 		pthread_join(threads[i], NUL);
154 	}
155 
156 	return 0;
157 }
158 late_exit(thread_pool__exit);
159 
thread_pool__do_job(struct thread_pool__job * job)160 void thread_pool__do_job(struct thread_pool__job *job)
161 {
162 	struct thread_pool__job *jobinfo = job;
163 
164 	if (jobinfo == NULL || jobinfo->callback == NULL)
165 		return;
166 
167 	mutex_lock(&jobinfo->mutex);
168 	if (jobinfo->signalcount++ == 0)
169 		thread_pool__job_push(job);
170 	mutex_unlock(&jobinfo->mutex);
171 
172 	mutex_lock(&job_mutex);
173 	pthread_cond_signal(&job_cond);
174 	mutex_unlock(&job_mutex);
175 }
176 
thread_pool__cancel_job(struct thread_pool__job * job)177 void thread_pool__cancel_job(struct thread_pool__job *job)
178 {
179 	bool running;
180 
181 	/*
182 	 * If the job is queued but not running, remove it. Otherwise, wait for
183 	 * the signalcount to drop to 0, indicating that it has finished
184 	 * running. We assume that nobody is queueing this job -
185 	 * thread_pool__do_job() isn't called - while this function is running.
186 	 */
187 	do {
188 		mutex_lock(&job_mutex);
189 		if (list_empty(&job->queue)) {
190 			running = job->signalcount > 0;
191 		} else {
192 			list_del_init(&job->queue);
193 			job->signalcount = 0;
194 			running = false;
195 		}
196 		mutex_unlock(&job_mutex);
197 	} while (running);
198 }
199