1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 4 #include <linux/kernel.h> 5 #include <linux/list.h> 6 #include <pthread.h> 7 #include <stdbool.h> 8 9 static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 10 static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 11 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 12 13 static LIST_HEAD(head); 14 15 static pthread_t *threads; 16 static long threadcount; 17 18 static struct thread_pool__job *thread_pool__job_pop_locked(void) 19 { 20 struct thread_pool__job *job; 21 22 if (list_empty(&head)) 23 return NULL; 24 25 job = list_first_entry(&head, struct thread_pool__job, queue); 26 list_del(&job->queue); 27 28 return job; 29 } 30 31 static void thread_pool__job_push_locked(struct thread_pool__job *job) 32 { 33 list_add_tail(&job->queue, &head); 34 } 35 36 static struct thread_pool__job *thread_pool__job_pop(void) 37 { 38 struct thread_pool__job *job; 39 40 mutex_lock(&job_mutex); 41 job = thread_pool__job_pop_locked(); 42 mutex_unlock(&job_mutex); 43 return job; 44 } 45 46 static void thread_pool__job_push(struct thread_pool__job *job) 47 { 48 mutex_lock(&job_mutex); 49 thread_pool__job_push_locked(job); 50 mutex_unlock(&job_mutex); 51 } 52 53 static void thread_pool__handle_job(struct thread_pool__job *job) 54 { 55 while (job) { 56 job->callback(job->kvm, job->data); 57 58 mutex_lock(&job->mutex); 59 60 if (--job->signalcount > 0) 61 /* If the job was signaled again while we were working */ 62 thread_pool__job_push(job); 63 64 mutex_unlock(&job->mutex); 65 66 job = thread_pool__job_pop(); 67 } 68 } 69 70 static void thread_pool__threadfunc_cleanup(void *param) 71 { 72 mutex_unlock(&job_mutex); 73 } 74 75 static void *thread_pool__threadfunc(void *param) 76 { 77 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 78 79 for (;;) { 80 struct thread_pool__job *curjob; 81 82 mutex_lock(&job_mutex); 83 while ((curjob = thread_pool__job_pop_locked()) == NULL) 84 pthread_cond_wait(&job_cond, &job_mutex); 85 mutex_unlock(&job_mutex); 86 87 thread_pool__handle_job(curjob); 88 } 89 90 pthread_cleanup_pop(0); 91 92 return NULL; 93 } 94 95 static int thread_pool__addthread(void) 96 { 97 int res; 98 void *newthreads; 99 100 mutex_lock(&thread_mutex); 101 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 102 if (newthreads == NULL) { 103 mutex_unlock(&thread_mutex); 104 return -1; 105 } 106 107 threads = newthreads; 108 109 res = pthread_create(threads + threadcount, NULL, 110 thread_pool__threadfunc, NULL); 111 112 if (res == 0) 113 threadcount++; 114 mutex_unlock(&thread_mutex); 115 116 return res; 117 } 118 119 int thread_pool__init(unsigned long thread_count) 120 { 121 unsigned long i; 122 123 for (i = 0; i < thread_count; i++) 124 if (thread_pool__addthread() < 0) 125 return i; 126 127 return i; 128 } 129 130 void thread_pool__do_job(struct thread_pool__job *job) 131 { 132 struct thread_pool__job *jobinfo = job; 133 134 if (jobinfo == NULL || jobinfo->callback == NULL) 135 return; 136 137 mutex_lock(&jobinfo->mutex); 138 if (jobinfo->signalcount++ == 0) 139 thread_pool__job_push(job); 140 mutex_unlock(&jobinfo->mutex); 141 142 mutex_lock(&job_mutex); 143 pthread_cond_signal(&job_cond); 144 mutex_unlock(&job_mutex); 145 } 146