xref: /kvmtool/util/threadpool.c (revision feceecd7734ae1bc34e6444739e9c340d78d78a4)
1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 
4 #include <linux/kernel.h>
5 #include <linux/list.h>
6 #include <pthread.h>
7 #include <stdbool.h>
8 
9 static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
10 static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
11 static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
12 
13 static LIST_HEAD(head);
14 
15 static pthread_t	*threads;
16 static long		threadcount;
17 static bool		running;
18 
19 static struct thread_pool__job *thread_pool__job_pop_locked(void)
20 {
21 	struct thread_pool__job *job;
22 
23 	if (list_empty(&head))
24 		return NULL;
25 
26 	job = list_first_entry(&head, struct thread_pool__job, queue);
27 	list_del(&job->queue);
28 
29 	return job;
30 }
31 
32 static void thread_pool__job_push_locked(struct thread_pool__job *job)
33 {
34 	list_add_tail(&job->queue, &head);
35 }
36 
37 static struct thread_pool__job *thread_pool__job_pop(void)
38 {
39 	struct thread_pool__job *job;
40 
41 	mutex_lock(&job_mutex);
42 	job = thread_pool__job_pop_locked();
43 	mutex_unlock(&job_mutex);
44 	return job;
45 }
46 
47 static void thread_pool__job_push(struct thread_pool__job *job)
48 {
49 	mutex_lock(&job_mutex);
50 	thread_pool__job_push_locked(job);
51 	mutex_unlock(&job_mutex);
52 }
53 
54 static void thread_pool__handle_job(struct thread_pool__job *job)
55 {
56 	while (job) {
57 		job->callback(job->kvm, job->data);
58 
59 		mutex_lock(&job->mutex);
60 
61 		if (--job->signalcount > 0)
62 			/* If the job was signaled again while we were working */
63 			thread_pool__job_push(job);
64 
65 		mutex_unlock(&job->mutex);
66 
67 		job = thread_pool__job_pop();
68 	}
69 }
70 
71 static void thread_pool__threadfunc_cleanup(void *param)
72 {
73 	mutex_unlock(&job_mutex);
74 }
75 
76 static void *thread_pool__threadfunc(void *param)
77 {
78 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
79 
80 	while (running) {
81 		struct thread_pool__job *curjob = NULL;
82 
83 		mutex_lock(&job_mutex);
84 		while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
85 			pthread_cond_wait(&job_cond, &job_mutex);
86 		mutex_unlock(&job_mutex);
87 
88 		if (running)
89 			thread_pool__handle_job(curjob);
90 	}
91 
92 	pthread_cleanup_pop(0);
93 
94 	return NULL;
95 }
96 
97 static int thread_pool__addthread(void)
98 {
99 	int res;
100 	void *newthreads;
101 
102 	mutex_lock(&thread_mutex);
103 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
104 	if (newthreads == NULL) {
105 		mutex_unlock(&thread_mutex);
106 		return -1;
107 	}
108 
109 	threads = newthreads;
110 
111 	res = pthread_create(threads + threadcount, NULL,
112 			     thread_pool__threadfunc, NULL);
113 
114 	if (res == 0)
115 		threadcount++;
116 	mutex_unlock(&thread_mutex);
117 
118 	return res;
119 }
120 
121 int thread_pool__init(struct kvm *kvm)
122 {
123 	unsigned long i;
124 	unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);
125 
126 	running = true;
127 
128 	for (i = 0; i < thread_count; i++)
129 		if (thread_pool__addthread() < 0)
130 			return i;
131 
132 	return i;
133 }
134 
135 int thread_pool__exit(struct kvm *kvm)
136 {
137 	int i;
138 	void *NUL = NULL;
139 
140 	running = false;
141 
142 	for (i = 0; i < threadcount; i++) {
143 		mutex_lock(&job_mutex);
144 		pthread_cond_signal(&job_cond);
145 		mutex_unlock(&job_mutex);
146 	}
147 
148 	for (i = 0; i < threadcount; i++) {
149 		pthread_join(threads[i], NUL);
150 	}
151 
152 	return 0;
153 }
154 
155 void thread_pool__do_job(struct thread_pool__job *job)
156 {
157 	struct thread_pool__job *jobinfo = job;
158 
159 	if (jobinfo == NULL || jobinfo->callback == NULL)
160 		return;
161 
162 	mutex_lock(&jobinfo->mutex);
163 	if (jobinfo->signalcount++ == 0)
164 		thread_pool__job_push(job);
165 	mutex_unlock(&jobinfo->mutex);
166 
167 	mutex_lock(&job_mutex);
168 	pthread_cond_signal(&job_cond);
169 	mutex_unlock(&job_mutex);
170 }
171