1 #include "kvm/threadpool.h" 2 #include "kvm/mutex.h" 3 #include "kvm/kvm.h" 4 5 #include <linux/kernel.h> 6 #include <linux/list.h> 7 #include <pthread.h> 8 #include <stdbool.h> 9 10 static DEFINE_MUTEX(job_mutex); 11 static DEFINE_MUTEX(thread_mutex); 12 static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 13 14 static LIST_HEAD(head); 15 16 static pthread_t *threads; 17 static long threadcount; 18 static bool running; 19 20 static struct thread_pool__job *thread_pool__job_pop_locked(void) 21 { 22 struct thread_pool__job *job; 23 24 if (list_empty(&head)) 25 return NULL; 26 27 job = list_first_entry(&head, struct thread_pool__job, queue); 28 list_del(&job->queue); 29 30 return job; 31 } 32 33 static void thread_pool__job_push_locked(struct thread_pool__job *job) 34 { 35 list_add_tail(&job->queue, &head); 36 } 37 38 static struct thread_pool__job *thread_pool__job_pop(void) 39 { 40 struct thread_pool__job *job; 41 42 mutex_lock(&job_mutex); 43 job = thread_pool__job_pop_locked(); 44 mutex_unlock(&job_mutex); 45 return job; 46 } 47 48 static void thread_pool__job_push(struct thread_pool__job *job) 49 { 50 mutex_lock(&job_mutex); 51 thread_pool__job_push_locked(job); 52 mutex_unlock(&job_mutex); 53 } 54 55 static void thread_pool__handle_job(struct thread_pool__job *job) 56 { 57 while (job) { 58 job->callback(job->kvm, job->data); 59 60 mutex_lock(&job->mutex); 61 62 if (--job->signalcount > 0) 63 /* If the job was signaled again while we were working */ 64 thread_pool__job_push(job); 65 66 mutex_unlock(&job->mutex); 67 68 job = thread_pool__job_pop(); 69 } 70 } 71 72 static void thread_pool__threadfunc_cleanup(void *param) 73 { 74 mutex_unlock(&job_mutex); 75 } 76 77 static void *thread_pool__threadfunc(void *param) 78 { 79 pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 80 81 kvm__set_thread_name("threadpool-worker"); 82 83 while (running) { 84 struct thread_pool__job *curjob = NULL; 85 86 mutex_lock(&job_mutex); 87 while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 88 pthread_cond_wait(&job_cond, &job_mutex.mutex); 89 mutex_unlock(&job_mutex); 90 91 if (running) 92 thread_pool__handle_job(curjob); 93 } 94 95 pthread_cleanup_pop(0); 96 97 return NULL; 98 } 99 100 static int thread_pool__addthread(void) 101 { 102 int res; 103 void *newthreads; 104 105 mutex_lock(&thread_mutex); 106 newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 107 if (newthreads == NULL) { 108 mutex_unlock(&thread_mutex); 109 return -1; 110 } 111 112 threads = newthreads; 113 114 res = pthread_create(threads + threadcount, NULL, 115 thread_pool__threadfunc, NULL); 116 117 if (res == 0) 118 threadcount++; 119 mutex_unlock(&thread_mutex); 120 121 return res; 122 } 123 124 int thread_pool__init(struct kvm *kvm) 125 { 126 unsigned long i; 127 unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 128 129 running = true; 130 131 for (i = 0; i < thread_count; i++) 132 if (thread_pool__addthread() < 0) 133 return i; 134 135 return i; 136 } 137 late_init(thread_pool__init); 138 139 int thread_pool__exit(struct kvm *kvm) 140 { 141 int i; 142 void *NUL = NULL; 143 144 running = false; 145 146 for (i = 0; i < threadcount; i++) { 147 mutex_lock(&job_mutex); 148 pthread_cond_signal(&job_cond); 149 mutex_unlock(&job_mutex); 150 } 151 152 for (i = 0; i < threadcount; i++) { 153 pthread_join(threads[i], NUL); 154 } 155 156 return 0; 157 } 158 late_exit(thread_pool__exit); 159 160 void thread_pool__do_job(struct thread_pool__job *job) 161 { 162 struct thread_pool__job *jobinfo = job; 163 164 if (jobinfo == NULL || jobinfo->callback == NULL) 165 return; 166 167 mutex_lock(&jobinfo->mutex); 168 if (jobinfo->signalcount++ == 0) 169 thread_pool__job_push(job); 170 mutex_unlock(&jobinfo->mutex); 171 172 mutex_lock(&job_mutex); 173 pthread_cond_signal(&job_cond); 174 mutex_unlock(&job_mutex); 175 } 176