1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 4 #include <linux/kernel.h> 5 #include <linux/list.h> 6 #include <pthread.h> 7 #include <stdbool.h> 8 9 static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 10 static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 11 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 12 13 static LIST_HEAD(head); 14 15 static pthread_t *threads; 16 static long threadcount; 17 18 static struct thread_pool__job *thread_pool__job_pop(void) 19 { 20 struct thread_pool__job *job; 21 22 if (list_empty(&head)) 23 return NULL; 24 25 job = list_first_entry(&head, struct thread_pool__job, queue); 26 list_del(&job->queue); 27 28 return job; 29 } 30 31 static void thread_pool__job_push(struct thread_pool__job *job) 32 { 33 list_add_tail(&job->queue, &head); 34 } 35 36 static struct thread_pool__job *thread_pool__job_pop_locked(void) 37 { 38 struct thread_pool__job *job; 39 40 mutex_lock(&job_mutex); 41 job = thread_pool__job_pop(); 42 mutex_unlock(&job_mutex); 43 return job; 44 } 45 46 static void thread_pool__job_push_locked(struct thread_pool__job *job) 47 { 48 mutex_lock(&job_mutex); 49 thread_pool__job_push(job); 50 mutex_unlock(&job_mutex); 51 } 52 53 static void thread_pool__handle_job(struct thread_pool__job *job) 54 { 55 while (job) { 56 job->callback(job->kvm, job->data); 57 58 mutex_lock(&job->mutex); 59 60 if (--job->signalcount > 0) 61 /* If the job was signaled again while we were working */ 62 thread_pool__job_push_locked(job); 63 64 mutex_unlock(&job->mutex); 65 66 job = thread_pool__job_pop_locked(); 67 } 68 } 69 70 static void thread_pool__threadfunc_cleanup(void *param) 71 { 72 mutex_unlock(&job_mutex); 73 } 74 75 static void *thread_pool__threadfunc(void *param) 76 { 77 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 78 79 for (;;) { 80 struct thread_pool__job *curjob; 81 82 mutex_lock(&job_mutex); 83 pthread_cond_wait(&job_cond, &job_mutex); 84 curjob = thread_pool__job_pop(); 85 mutex_unlock(&job_mutex); 86 87 if (curjob) 88 thread_pool__handle_job(curjob); 89 } 90 91 pthread_cleanup_pop(0); 92 93 return NULL; 94 } 95 96 static int thread_pool__addthread(void) 97 { 98 int res; 99 void *newthreads; 100 101 mutex_lock(&thread_mutex); 102 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 103 if (newthreads == NULL) { 104 mutex_unlock(&thread_mutex); 105 return -1; 106 } 107 108 threads = newthreads; 109 110 res = pthread_create(threads + threadcount, NULL, 111 thread_pool__threadfunc, NULL); 112 113 if (res == 0) 114 threadcount++; 115 mutex_unlock(&thread_mutex); 116 117 return res; 118 } 119 120 int thread_pool__init(unsigned long thread_count) 121 { 122 unsigned long i; 123 124 for (i = 0; i < thread_count; i++) 125 if (thread_pool__addthread() < 0) 126 return i; 127 128 return i; 129 } 130 131 void thread_pool__do_job(struct thread_pool__job *job) 132 { 133 struct thread_pool__job *jobinfo = job; 134 135 if (jobinfo == NULL || jobinfo->callback == NULL) 136 return; 137 138 mutex_lock(&jobinfo->mutex); 139 if (jobinfo->signalcount++ == 0) 140 thread_pool__job_push_locked(job); 141 mutex_unlock(&jobinfo->mutex); 142 143 mutex_lock(&job_mutex); 144 pthread_cond_signal(&job_cond); 145 mutex_unlock(&job_mutex); 146 } 147