xref: /kvmtool/util/threadpool.c (revision df0c7f571eb7fad7895e0930cd695eb039b1fe57)
1 #include "kvm/threadpool.h"
2 #include "kvm/mutex.h"
3 
4 #include <linux/kernel.h>
5 #include <linux/list.h>
6 #include <pthread.h>
7 #include <stdbool.h>
8 
9 static pthread_mutex_t	job_mutex	= PTHREAD_MUTEX_INITIALIZER;
10 static pthread_mutex_t	thread_mutex	= PTHREAD_MUTEX_INITIALIZER;
11 static pthread_cond_t	job_cond	= PTHREAD_COND_INITIALIZER;
12 
13 static LIST_HEAD(head);
14 
15 static pthread_t	*threads;
16 static long		threadcount;
17 
18 static struct thread_pool__job *thread_pool__job_pop(void)
19 {
20 	struct thread_pool__job *job;
21 
22 	if (list_empty(&head))
23 		return NULL;
24 
25 	job = list_first_entry(&head, struct thread_pool__job, queue);
26 	list_del(&job->queue);
27 
28 	return job;
29 }
30 
31 static void thread_pool__job_push(struct thread_pool__job *job)
32 {
33 	list_add_tail(&job->queue, &head);
34 }
35 
36 static struct thread_pool__job *thread_pool__job_pop_locked(void)
37 {
38 	struct thread_pool__job *job;
39 
40 	mutex_lock(&job_mutex);
41 	job = thread_pool__job_pop();
42 	mutex_unlock(&job_mutex);
43 	return job;
44 }
45 
46 static void thread_pool__job_push_locked(struct thread_pool__job *job)
47 {
48 	mutex_lock(&job_mutex);
49 	thread_pool__job_push(job);
50 	mutex_unlock(&job_mutex);
51 }
52 
53 static void thread_pool__handle_job(struct thread_pool__job *job)
54 {
55 	while (job) {
56 		job->callback(job->kvm, job->data);
57 
58 		mutex_lock(&job->mutex);
59 
60 		if (--job->signalcount > 0)
61 			/* If the job was signaled again while we were working */
62 			thread_pool__job_push_locked(job);
63 
64 		mutex_unlock(&job->mutex);
65 
66 		job = thread_pool__job_pop_locked();
67 	}
68 }
69 
70 static void thread_pool__threadfunc_cleanup(void *param)
71 {
72 	mutex_unlock(&job_mutex);
73 }
74 
75 static void *thread_pool__threadfunc(void *param)
76 {
77 	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
78 
79 	for (;;) {
80 		struct thread_pool__job *curjob;
81 
82 		mutex_lock(&job_mutex);
83 		pthread_cond_wait(&job_cond, &job_mutex);
84 		curjob = thread_pool__job_pop();
85 		mutex_unlock(&job_mutex);
86 
87 		if (curjob)
88 			thread_pool__handle_job(curjob);
89 	}
90 
91 	pthread_cleanup_pop(0);
92 
93 	return NULL;
94 }
95 
96 static int thread_pool__addthread(void)
97 {
98 	int res;
99 	void *newthreads;
100 
101 	mutex_lock(&thread_mutex);
102 	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
103 	if (newthreads == NULL) {
104 		mutex_unlock(&thread_mutex);
105 		return -1;
106 	}
107 
108 	threads = newthreads;
109 
110 	res = pthread_create(threads + threadcount, NULL,
111 			     thread_pool__threadfunc, NULL);
112 
113 	if (res == 0)
114 		threadcount++;
115 	mutex_unlock(&thread_mutex);
116 
117 	return res;
118 }
119 
120 int thread_pool__init(unsigned long thread_count)
121 {
122 	unsigned long i;
123 
124 	for (i = 0; i < thread_count; i++)
125 		if (thread_pool__addthread() < 0)
126 			return i;
127 
128 	return i;
129 }
130 
131 void thread_pool__do_job(struct thread_pool__job *job)
132 {
133 	struct thread_pool__job *jobinfo = job;
134 
135 	if (jobinfo == NULL || jobinfo->callback == NULL)
136 		return;
137 
138 	mutex_lock(&jobinfo->mutex);
139 	if (jobinfo->signalcount++ == 0)
140 		thread_pool__job_push_locked(job);
141 	mutex_unlock(&jobinfo->mutex);
142 
143 	mutex_lock(&job_mutex);
144 	pthread_cond_signal(&job_cond);
145 	mutex_unlock(&job_mutex);
146 }
147