1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 #include "kvm/kvm.h" 4 5 #include <linux/kernel.h> 6 #include <linux/list.h> 7 #include <pthread.h> 8 #include <stdbool.h> 9 10 static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 11 static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 12 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 13 14 static LIST_HEAD(head); 15 16 static pthread_t *threads; 17 static long threadcount; 18 static bool running; 19 20 static struct thread_pool__job *thread_pool__job_pop_locked(void) 21 { 22 struct thread_pool__job *job; 23 24 if (list_empty(&head)) 25 return NULL; 26 27 job = list_first_entry(&head, struct thread_pool__job, queue); 28 list_del(&job->queue); 29 30 return job; 31 } 32 33 static void thread_pool__job_push_locked(struct thread_pool__job *job) 34 { 35 list_add_tail(&job->queue, &head); 36 } 37 38 static struct thread_pool__job *thread_pool__job_pop(void) 39 { 40 struct thread_pool__job *job; 41 42 mutex_lock(&job_mutex); 43 job = thread_pool__job_pop_locked(); 44 mutex_unlock(&job_mutex); 45 return job; 46 } 47 48 static void thread_pool__job_push(struct thread_pool__job *job) 49 { 50 mutex_lock(&job_mutex); 51 thread_pool__job_push_locked(job); 52 mutex_unlock(&job_mutex); 53 } 54 55 static void thread_pool__handle_job(struct thread_pool__job *job) 56 { 57 while (job) { 58 job->callback(job->kvm, job->data); 59 60 mutex_lock(&job->mutex); 61 62 if (--job->signalcount > 0) 63 /* If the job was signaled again while we were working */ 64 thread_pool__job_push(job); 65 66 mutex_unlock(&job->mutex); 67 68 job = thread_pool__job_pop(); 69 } 70 } 71 72 static void thread_pool__threadfunc_cleanup(void *param) 73 { 74 mutex_unlock(&job_mutex); 75 } 76 77 static void *thread_pool__threadfunc(void *param) 78 { 79 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 80 81 while (running) { 82 struct thread_pool__job *curjob = NULL; 83 84 mutex_lock(&job_mutex); 85 while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 86 pthread_cond_wait(&job_cond, &job_mutex); 87 mutex_unlock(&job_mutex); 88 89 if (running) 90 thread_pool__handle_job(curjob); 91 } 92 93 pthread_cleanup_pop(0); 94 95 return NULL; 96 } 97 98 static int thread_pool__addthread(void) 99 { 100 int res; 101 void *newthreads; 102 103 mutex_lock(&thread_mutex); 104 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 105 if (newthreads == NULL) { 106 mutex_unlock(&thread_mutex); 107 return -1; 108 } 109 110 threads = newthreads; 111 112 res = pthread_create(threads + threadcount, NULL, 113 thread_pool__threadfunc, NULL); 114 115 if (res == 0) 116 threadcount++; 117 mutex_unlock(&thread_mutex); 118 119 return res; 120 } 121 122 int thread_pool__init(struct kvm *kvm) 123 { 124 unsigned long i; 125 unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 126 127 running = true; 128 129 for (i = 0; i < thread_count; i++) 130 if (thread_pool__addthread() < 0) 131 return i; 132 133 return i; 134 } 135 late_init(thread_pool__init); 136 137 int thread_pool__exit(struct kvm *kvm) 138 { 139 int i; 140 void *NUL = NULL; 141 142 running = false; 143 144 for (i = 0; i < threadcount; i++) { 145 mutex_lock(&job_mutex); 146 pthread_cond_signal(&job_cond); 147 mutex_unlock(&job_mutex); 148 } 149 150 for (i = 0; i < threadcount; i++) { 151 pthread_join(threads[i], NUL); 152 } 153 154 return 0; 155 } 156 late_exit(thread_pool__exit); 157 158 void thread_pool__do_job(struct thread_pool__job *job) 159 { 160 struct thread_pool__job *jobinfo = job; 161 162 if (jobinfo == NULL || jobinfo->callback == NULL) 163 return; 164 165 mutex_lock(&jobinfo->mutex); 166 if (jobinfo->signalcount++ == 0) 167 thread_pool__job_push(job); 168 mutex_unlock(&jobinfo->mutex); 169 170 mutex_lock(&job_mutex); 171 pthread_cond_signal(&job_cond); 172 mutex_unlock(&job_mutex); 173 } 174