1*d60bafe5SSasha Levin #include "kvm/threadpool.h" 2*d60bafe5SSasha Levin #include "kvm/mutex.h" 3*d60bafe5SSasha Levin 4*d60bafe5SSasha Levin #include <linux/kernel.h> 5*d60bafe5SSasha Levin #include <linux/list.h> 6*d60bafe5SSasha Levin #include <pthread.h> 7*d60bafe5SSasha Levin #include <stdbool.h> 8*d60bafe5SSasha Levin 9*d60bafe5SSasha Levin struct thread_pool__job_info { 10*d60bafe5SSasha Levin kvm_thread_callback_fn_t callback; 11*d60bafe5SSasha Levin struct kvm *kvm; 12*d60bafe5SSasha Levin void *data; 13*d60bafe5SSasha Levin 14*d60bafe5SSasha Levin int signalcount; 15*d60bafe5SSasha Levin pthread_mutex_t mutex; 16*d60bafe5SSasha Levin 17*d60bafe5SSasha Levin struct list_head queue; 18*d60bafe5SSasha Levin }; 19*d60bafe5SSasha Levin 20*d60bafe5SSasha Levin static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 21*d60bafe5SSasha Levin static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 22*d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 23*d60bafe5SSasha Levin 24*d60bafe5SSasha Levin static LIST_HEAD(head); 25*d60bafe5SSasha Levin 26*d60bafe5SSasha Levin static pthread_t *threads; 27*d60bafe5SSasha Levin static long threadcount; 28*d60bafe5SSasha Levin 29*d60bafe5SSasha Levin static struct thread_pool__job_info *thread_pool__job_info_pop(void) 30*d60bafe5SSasha Levin { 31*d60bafe5SSasha Levin struct thread_pool__job_info *job; 32*d60bafe5SSasha Levin 33*d60bafe5SSasha Levin if (list_empty(&head)) 34*d60bafe5SSasha Levin return NULL; 35*d60bafe5SSasha Levin 36*d60bafe5SSasha Levin job = list_first_entry(&head, struct thread_pool__job_info, queue); 37*d60bafe5SSasha Levin list_del(&job->queue); 38*d60bafe5SSasha Levin 39*d60bafe5SSasha Levin return job; 40*d60bafe5SSasha Levin } 41*d60bafe5SSasha Levin 42*d60bafe5SSasha Levin static void thread_pool__job_info_push(struct thread_pool__job_info *job) 43*d60bafe5SSasha Levin { 44*d60bafe5SSasha Levin list_add_tail(&job->queue, &head); 45*d60bafe5SSasha Levin } 46*d60bafe5SSasha Levin 47*d60bafe5SSasha Levin static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void) 48*d60bafe5SSasha Levin { 49*d60bafe5SSasha Levin struct thread_pool__job_info *job; 50*d60bafe5SSasha Levin 51*d60bafe5SSasha Levin mutex_lock(&job_mutex); 52*d60bafe5SSasha Levin job = thread_pool__job_info_pop(); 53*d60bafe5SSasha Levin mutex_unlock(&job_mutex); 54*d60bafe5SSasha Levin return job; 55*d60bafe5SSasha Levin } 56*d60bafe5SSasha Levin 57*d60bafe5SSasha Levin static void thread_pool__job_info_push_locked(struct thread_pool__job_info *job) 58*d60bafe5SSasha Levin { 59*d60bafe5SSasha Levin mutex_lock(&job_mutex); 60*d60bafe5SSasha Levin thread_pool__job_info_push(job); 61*d60bafe5SSasha Levin mutex_unlock(&job_mutex); 62*d60bafe5SSasha Levin } 63*d60bafe5SSasha Levin 64*d60bafe5SSasha Levin static void thread_pool__handle_job(struct thread_pool__job_info *job) 65*d60bafe5SSasha Levin { 66*d60bafe5SSasha Levin while (job) { 67*d60bafe5SSasha Levin job->callback(job->kvm, job->data); 68*d60bafe5SSasha Levin 69*d60bafe5SSasha Levin mutex_lock(&job->mutex); 70*d60bafe5SSasha Levin 71*d60bafe5SSasha Levin if (--job->signalcount > 0) 72*d60bafe5SSasha Levin /* If the job was signaled again while we were working */ 73*d60bafe5SSasha Levin thread_pool__job_info_push_locked(job); 74*d60bafe5SSasha Levin 75*d60bafe5SSasha Levin mutex_unlock(&job->mutex); 76*d60bafe5SSasha Levin 77*d60bafe5SSasha Levin job = thread_pool__job_info_pop_locked(); 78*d60bafe5SSasha Levin } 79*d60bafe5SSasha Levin } 80*d60bafe5SSasha Levin 81*d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param) 82*d60bafe5SSasha Levin { 83*d60bafe5SSasha Levin mutex_unlock(&job_mutex); 84*d60bafe5SSasha Levin } 85*d60bafe5SSasha Levin 86*d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param) 87*d60bafe5SSasha Levin { 88*d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 89*d60bafe5SSasha Levin 90*d60bafe5SSasha Levin for (;;) { 91*d60bafe5SSasha Levin struct thread_pool__job_info *curjob; 92*d60bafe5SSasha Levin 93*d60bafe5SSasha Levin mutex_lock(&job_mutex); 94*d60bafe5SSasha Levin pthread_cond_wait(&job_cond, &job_mutex); 95*d60bafe5SSasha Levin curjob = thread_pool__job_info_pop(); 96*d60bafe5SSasha Levin mutex_unlock(&job_mutex); 97*d60bafe5SSasha Levin 98*d60bafe5SSasha Levin if (curjob) 99*d60bafe5SSasha Levin thread_pool__handle_job(curjob); 100*d60bafe5SSasha Levin } 101*d60bafe5SSasha Levin 102*d60bafe5SSasha Levin pthread_cleanup_pop(0); 103*d60bafe5SSasha Levin 104*d60bafe5SSasha Levin return NULL; 105*d60bafe5SSasha Levin } 106*d60bafe5SSasha Levin 107*d60bafe5SSasha Levin static int thread_pool__addthread(void) 108*d60bafe5SSasha Levin { 109*d60bafe5SSasha Levin int res; 110*d60bafe5SSasha Levin void *newthreads; 111*d60bafe5SSasha Levin 112*d60bafe5SSasha Levin mutex_lock(&thread_mutex); 113*d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 114*d60bafe5SSasha Levin if (newthreads == NULL) { 115*d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 116*d60bafe5SSasha Levin return -1; 117*d60bafe5SSasha Levin } 118*d60bafe5SSasha Levin 119*d60bafe5SSasha Levin threads = newthreads; 120*d60bafe5SSasha Levin 121*d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL, 122*d60bafe5SSasha Levin thread_pool__threadfunc, NULL); 123*d60bafe5SSasha Levin 124*d60bafe5SSasha Levin if (res == 0) 125*d60bafe5SSasha Levin threadcount++; 126*d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 127*d60bafe5SSasha Levin 128*d60bafe5SSasha Levin return res; 129*d60bafe5SSasha Levin } 130*d60bafe5SSasha Levin 131*d60bafe5SSasha Levin int thread_pool__init(unsigned long thread_count) 132*d60bafe5SSasha Levin { 133*d60bafe5SSasha Levin unsigned long i; 134*d60bafe5SSasha Levin 135*d60bafe5SSasha Levin for (i = 0; i < thread_count; i++) 136*d60bafe5SSasha Levin if (thread_pool__addthread() < 0) 137*d60bafe5SSasha Levin return i; 138*d60bafe5SSasha Levin 139*d60bafe5SSasha Levin return i; 140*d60bafe5SSasha Levin } 141*d60bafe5SSasha Levin 142*d60bafe5SSasha Levin void *thread_pool__add_jobtype(struct kvm *kvm, 143*d60bafe5SSasha Levin kvm_thread_callback_fn_t callback, void *data) 144*d60bafe5SSasha Levin { 145*d60bafe5SSasha Levin struct thread_pool__job_info *job = calloc(1, sizeof(*job)); 146*d60bafe5SSasha Levin 147*d60bafe5SSasha Levin *job = (struct thread_pool__job_info) { 148*d60bafe5SSasha Levin .kvm = kvm, 149*d60bafe5SSasha Levin .data = data, 150*d60bafe5SSasha Levin .callback = callback, 151*d60bafe5SSasha Levin .mutex = PTHREAD_MUTEX_INITIALIZER 152*d60bafe5SSasha Levin }; 153*d60bafe5SSasha Levin 154*d60bafe5SSasha Levin return job; 155*d60bafe5SSasha Levin } 156*d60bafe5SSasha Levin 157*d60bafe5SSasha Levin void thread_pool__signal_work(void *job) 158*d60bafe5SSasha Levin { 159*d60bafe5SSasha Levin struct thread_pool__job_info *jobinfo = job; 160*d60bafe5SSasha Levin 161*d60bafe5SSasha Levin if (jobinfo == NULL) 162*d60bafe5SSasha Levin return; 163*d60bafe5SSasha Levin 164*d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex); 165*d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0) 166*d60bafe5SSasha Levin thread_pool__job_info_push_locked(job); 167*d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex); 168*d60bafe5SSasha Levin 169*d60bafe5SSasha Levin pthread_cond_signal(&job_cond); 170*d60bafe5SSasha Levin } 171