1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 4 #include <linux/kernel.h> 5 #include <linux/list.h> 6 #include <pthread.h> 7 #include <stdbool.h> 8 9 static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 10 static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 11 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 12 13 static LIST_HEAD(head); 14 15 static pthread_t *threads; 16 static long threadcount; 17 static bool running; 18 19 static struct thread_pool__job *thread_pool__job_pop_locked(void) 20 { 21 struct thread_pool__job *job; 22 23 if (list_empty(&head)) 24 return NULL; 25 26 job = list_first_entry(&head, struct thread_pool__job, queue); 27 list_del(&job->queue); 28 29 return job; 30 } 31 32 static void thread_pool__job_push_locked(struct thread_pool__job *job) 33 { 34 list_add_tail(&job->queue, &head); 35 } 36 37 static struct thread_pool__job *thread_pool__job_pop(void) 38 { 39 struct thread_pool__job *job; 40 41 mutex_lock(&job_mutex); 42 job = thread_pool__job_pop_locked(); 43 mutex_unlock(&job_mutex); 44 return job; 45 } 46 47 static void thread_pool__job_push(struct thread_pool__job *job) 48 { 49 mutex_lock(&job_mutex); 50 thread_pool__job_push_locked(job); 51 mutex_unlock(&job_mutex); 52 } 53 54 static void thread_pool__handle_job(struct thread_pool__job *job) 55 { 56 while (job) { 57 job->callback(job->kvm, job->data); 58 59 mutex_lock(&job->mutex); 60 61 if (--job->signalcount > 0) 62 /* If the job was signaled again while we were working */ 63 thread_pool__job_push(job); 64 65 mutex_unlock(&job->mutex); 66 67 job = thread_pool__job_pop(); 68 } 69 } 70 71 static void thread_pool__threadfunc_cleanup(void *param) 72 { 73 mutex_unlock(&job_mutex); 74 } 75 76 static void *thread_pool__threadfunc(void *param) 77 { 78 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 79 80 while (running) { 81 struct thread_pool__job *curjob = NULL; 82 83 mutex_lock(&job_mutex); 84 while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 85 pthread_cond_wait(&job_cond, &job_mutex); 86 mutex_unlock(&job_mutex); 87 88 if (running) 89 thread_pool__handle_job(curjob); 90 } 91 92 pthread_cleanup_pop(0); 93 94 return NULL; 95 } 96 97 static int thread_pool__addthread(void) 98 { 99 int res; 100 void *newthreads; 101 102 mutex_lock(&thread_mutex); 103 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 104 if (newthreads == NULL) { 105 mutex_unlock(&thread_mutex); 106 return -1; 107 } 108 109 threads = newthreads; 110 111 res = pthread_create(threads + threadcount, NULL, 112 thread_pool__threadfunc, NULL); 113 114 if (res == 0) 115 threadcount++; 116 mutex_unlock(&thread_mutex); 117 118 return res; 119 } 120 121 int thread_pool__init(struct kvm *kvm) 122 { 123 unsigned long i; 124 unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 125 126 running = true; 127 128 for (i = 0; i < thread_count; i++) 129 if (thread_pool__addthread() < 0) 130 return i; 131 132 return i; 133 } 134 135 int thread_pool__exit(struct kvm *kvm) 136 { 137 int i; 138 void *NUL = NULL; 139 140 running = false; 141 142 for (i = 0; i < threadcount; i++) { 143 mutex_lock(&job_mutex); 144 pthread_cond_signal(&job_cond); 145 mutex_unlock(&job_mutex); 146 } 147 148 for (i = 0; i < threadcount; i++) { 149 pthread_join(threads[i], NUL); 150 } 151 152 return 0; 153 } 154 155 void thread_pool__do_job(struct thread_pool__job *job) 156 { 157 struct thread_pool__job *jobinfo = job; 158 159 if (jobinfo == NULL || jobinfo->callback == NULL) 160 return; 161 162 mutex_lock(&jobinfo->mutex); 163 if (jobinfo->signalcount++ == 0) 164 thread_pool__job_push(job); 165 mutex_unlock(&jobinfo->mutex); 166 167 mutex_lock(&job_mutex); 168 pthread_cond_signal(&job_cond); 169 mutex_unlock(&job_mutex); 170 } 171