1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 4 #include <linux/kernel.h> 5 #include <linux/list.h> 6 #include <pthread.h> 7 #include <stdbool.h> 8 9 struct thread_pool__job { 10 kvm_thread_callback_fn_t callback; 11 struct kvm *kvm; 12 void *data; 13 14 int signalcount; 15 pthread_mutex_t mutex; 16 17 struct list_head queue; 18 }; 19 20 static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 21 static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 22 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 23 24 static LIST_HEAD(head); 25 26 static pthread_t *threads; 27 static long threadcount; 28 29 static struct thread_pool__job *thread_pool__job_pop(void) 30 { 31 struct thread_pool__job *job; 32 33 if (list_empty(&head)) 34 return NULL; 35 36 job = list_first_entry(&head, struct thread_pool__job, queue); 37 list_del(&job->queue); 38 39 return job; 40 } 41 42 static void thread_pool__job_push(struct thread_pool__job *job) 43 { 44 list_add_tail(&job->queue, &head); 45 } 46 47 static struct thread_pool__job *thread_pool__job_pop_locked(void) 48 { 49 struct thread_pool__job *job; 50 51 mutex_lock(&job_mutex); 52 job = thread_pool__job_pop(); 53 mutex_unlock(&job_mutex); 54 return job; 55 } 56 57 static void thread_pool__job_push_locked(struct thread_pool__job *job) 58 { 59 mutex_lock(&job_mutex); 60 thread_pool__job_push(job); 61 mutex_unlock(&job_mutex); 62 } 63 64 static void thread_pool__handle_job(struct thread_pool__job *job) 65 { 66 while (job) { 67 job->callback(job->kvm, job->data); 68 69 mutex_lock(&job->mutex); 70 71 if (--job->signalcount > 0) 72 /* If the job was signaled again while we were working */ 73 thread_pool__job_push_locked(job); 74 75 mutex_unlock(&job->mutex); 76 77 job = thread_pool__job_pop_locked(); 78 } 79 } 80 81 static void thread_pool__threadfunc_cleanup(void *param) 82 { 83 mutex_unlock(&job_mutex); 84 } 85 86 static void *thread_pool__threadfunc(void *param) 87 { 88 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 89 90 for (;;) { 91 struct thread_pool__job *curjob; 92 93 mutex_lock(&job_mutex); 94 pthread_cond_wait(&job_cond, &job_mutex); 95 curjob = thread_pool__job_pop(); 96 mutex_unlock(&job_mutex); 97 98 if (curjob) 99 thread_pool__handle_job(curjob); 100 } 101 102 pthread_cleanup_pop(0); 103 104 return NULL; 105 } 106 107 static int thread_pool__addthread(void) 108 { 109 int res; 110 void *newthreads; 111 112 mutex_lock(&thread_mutex); 113 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 114 if (newthreads == NULL) { 115 mutex_unlock(&thread_mutex); 116 return -1; 117 } 118 119 threads = newthreads; 120 121 res = pthread_create(threads + threadcount, NULL, 122 thread_pool__threadfunc, NULL); 123 124 if (res == 0) 125 threadcount++; 126 mutex_unlock(&thread_mutex); 127 128 return res; 129 } 130 131 int thread_pool__init(unsigned long thread_count) 132 { 133 unsigned long i; 134 135 for (i = 0; i < thread_count; i++) 136 if (thread_pool__addthread() < 0) 137 return i; 138 139 return i; 140 } 141 142 void *thread_pool__add_job(struct kvm *kvm, 143 kvm_thread_callback_fn_t callback, void *data) 144 { 145 struct thread_pool__job *job = calloc(1, sizeof(*job)); 146 147 *job = (struct thread_pool__job) { 148 .kvm = kvm, 149 .data = data, 150 .callback = callback, 151 .mutex = PTHREAD_MUTEX_INITIALIZER 152 }; 153 154 return job; 155 } 156 157 void thread_pool__do_job(void *job) 158 { 159 struct thread_pool__job *jobinfo = job; 160 161 if (jobinfo == NULL) 162 return; 163 164 mutex_lock(&jobinfo->mutex); 165 if (jobinfo->signalcount++ == 0) 166 thread_pool__job_push_locked(job); 167 mutex_unlock(&jobinfo->mutex); 168 169 mutex_lock(&job_mutex); 170 pthread_cond_signal(&job_cond); 171 mutex_unlock(&job_mutex); 172 } 173