1d60bafe5SSasha Levin #include "kvm/threadpool.h" 2d60bafe5SSasha Levin #include "kvm/mutex.h" 3d60bafe5SSasha Levin 4d60bafe5SSasha Levin #include <linux/kernel.h> 5d60bafe5SSasha Levin #include <linux/list.h> 6d60bafe5SSasha Levin #include <pthread.h> 7d60bafe5SSasha Levin #include <stdbool.h> 8d60bafe5SSasha Levin 9*f6a083e9SSasha Levin struct thread_pool__job { 10d60bafe5SSasha Levin kvm_thread_callback_fn_t callback; 11d60bafe5SSasha Levin struct kvm *kvm; 12d60bafe5SSasha Levin void *data; 13d60bafe5SSasha Levin 14d60bafe5SSasha Levin int signalcount; 15d60bafe5SSasha Levin pthread_mutex_t mutex; 16d60bafe5SSasha Levin 17d60bafe5SSasha Levin struct list_head queue; 18d60bafe5SSasha Levin }; 19d60bafe5SSasha Levin 20d60bafe5SSasha Levin static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 21d60bafe5SSasha Levin static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 22d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 23d60bafe5SSasha Levin 24d60bafe5SSasha Levin static LIST_HEAD(head); 25d60bafe5SSasha Levin 26d60bafe5SSasha Levin static pthread_t *threads; 27d60bafe5SSasha Levin static long threadcount; 28d60bafe5SSasha Levin 29*f6a083e9SSasha Levin static struct thread_pool__job *thread_pool__job_pop(void) 30d60bafe5SSasha Levin { 31*f6a083e9SSasha Levin struct thread_pool__job *job; 32d60bafe5SSasha Levin 33d60bafe5SSasha Levin if (list_empty(&head)) 34d60bafe5SSasha Levin return NULL; 35d60bafe5SSasha Levin 36*f6a083e9SSasha Levin job = list_first_entry(&head, struct thread_pool__job, queue); 37d60bafe5SSasha Levin list_del(&job->queue); 38d60bafe5SSasha Levin 39d60bafe5SSasha Levin return job; 40d60bafe5SSasha Levin } 41d60bafe5SSasha Levin 42*f6a083e9SSasha Levin static void thread_pool__job_push(struct thread_pool__job *job) 43d60bafe5SSasha Levin { 44d60bafe5SSasha Levin list_add_tail(&job->queue, &head); 45d60bafe5SSasha Levin } 46d60bafe5SSasha Levin 47*f6a083e9SSasha Levin static struct thread_pool__job *thread_pool__job_pop_locked(void) 48d60bafe5SSasha Levin { 49*f6a083e9SSasha Levin struct thread_pool__job *job; 50d60bafe5SSasha Levin 51d60bafe5SSasha Levin mutex_lock(&job_mutex); 52*f6a083e9SSasha Levin job = thread_pool__job_pop(); 53d60bafe5SSasha Levin mutex_unlock(&job_mutex); 54d60bafe5SSasha Levin return job; 55d60bafe5SSasha Levin } 56d60bafe5SSasha Levin 57*f6a083e9SSasha Levin static void thread_pool__job_push_locked(struct thread_pool__job *job) 58d60bafe5SSasha Levin { 59d60bafe5SSasha Levin mutex_lock(&job_mutex); 60*f6a083e9SSasha Levin thread_pool__job_push(job); 61d60bafe5SSasha Levin mutex_unlock(&job_mutex); 62d60bafe5SSasha Levin } 63d60bafe5SSasha Levin 64*f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job) 65d60bafe5SSasha Levin { 66d60bafe5SSasha Levin while (job) { 67d60bafe5SSasha Levin job->callback(job->kvm, job->data); 68d60bafe5SSasha Levin 69d60bafe5SSasha Levin mutex_lock(&job->mutex); 70d60bafe5SSasha Levin 71d60bafe5SSasha Levin if (--job->signalcount > 0) 72d60bafe5SSasha Levin /* If the job was signaled again while we were working */ 73*f6a083e9SSasha Levin thread_pool__job_push_locked(job); 74d60bafe5SSasha Levin 75d60bafe5SSasha Levin mutex_unlock(&job->mutex); 76d60bafe5SSasha Levin 77*f6a083e9SSasha Levin job = thread_pool__job_pop_locked(); 78d60bafe5SSasha Levin } 79d60bafe5SSasha Levin } 80d60bafe5SSasha Levin 81d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param) 82d60bafe5SSasha Levin { 83d60bafe5SSasha Levin mutex_unlock(&job_mutex); 84d60bafe5SSasha Levin } 85d60bafe5SSasha Levin 86d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param) 87d60bafe5SSasha Levin { 88d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 89d60bafe5SSasha Levin 90d60bafe5SSasha Levin for (;;) { 91*f6a083e9SSasha Levin struct thread_pool__job *curjob; 92d60bafe5SSasha Levin 93d60bafe5SSasha Levin mutex_lock(&job_mutex); 94d60bafe5SSasha Levin pthread_cond_wait(&job_cond, &job_mutex); 95*f6a083e9SSasha Levin curjob = thread_pool__job_pop(); 96d60bafe5SSasha Levin mutex_unlock(&job_mutex); 97d60bafe5SSasha Levin 98d60bafe5SSasha Levin if (curjob) 99d60bafe5SSasha Levin thread_pool__handle_job(curjob); 100d60bafe5SSasha Levin } 101d60bafe5SSasha Levin 102d60bafe5SSasha Levin pthread_cleanup_pop(0); 103d60bafe5SSasha Levin 104d60bafe5SSasha Levin return NULL; 105d60bafe5SSasha Levin } 106d60bafe5SSasha Levin 107d60bafe5SSasha Levin static int thread_pool__addthread(void) 108d60bafe5SSasha Levin { 109d60bafe5SSasha Levin int res; 110d60bafe5SSasha Levin void *newthreads; 111d60bafe5SSasha Levin 112d60bafe5SSasha Levin mutex_lock(&thread_mutex); 113d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 114d60bafe5SSasha Levin if (newthreads == NULL) { 115d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 116d60bafe5SSasha Levin return -1; 117d60bafe5SSasha Levin } 118d60bafe5SSasha Levin 119d60bafe5SSasha Levin threads = newthreads; 120d60bafe5SSasha Levin 121d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL, 122d60bafe5SSasha Levin thread_pool__threadfunc, NULL); 123d60bafe5SSasha Levin 124d60bafe5SSasha Levin if (res == 0) 125d60bafe5SSasha Levin threadcount++; 126d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 127d60bafe5SSasha Levin 128d60bafe5SSasha Levin return res; 129d60bafe5SSasha Levin } 130d60bafe5SSasha Levin 131d60bafe5SSasha Levin int thread_pool__init(unsigned long thread_count) 132d60bafe5SSasha Levin { 133d60bafe5SSasha Levin unsigned long i; 134d60bafe5SSasha Levin 135d60bafe5SSasha Levin for (i = 0; i < thread_count; i++) 136d60bafe5SSasha Levin if (thread_pool__addthread() < 0) 137d60bafe5SSasha Levin return i; 138d60bafe5SSasha Levin 139d60bafe5SSasha Levin return i; 140d60bafe5SSasha Levin } 141d60bafe5SSasha Levin 142*f6a083e9SSasha Levin void *thread_pool__add_job(struct kvm *kvm, 143d60bafe5SSasha Levin kvm_thread_callback_fn_t callback, void *data) 144d60bafe5SSasha Levin { 145*f6a083e9SSasha Levin struct thread_pool__job *job = calloc(1, sizeof(*job)); 146d60bafe5SSasha Levin 147*f6a083e9SSasha Levin *job = (struct thread_pool__job) { 148d60bafe5SSasha Levin .kvm = kvm, 149d60bafe5SSasha Levin .data = data, 150d60bafe5SSasha Levin .callback = callback, 151d60bafe5SSasha Levin .mutex = PTHREAD_MUTEX_INITIALIZER 152d60bafe5SSasha Levin }; 153d60bafe5SSasha Levin 154d60bafe5SSasha Levin return job; 155d60bafe5SSasha Levin } 156d60bafe5SSasha Levin 157*f6a083e9SSasha Levin void thread_pool__do_job(void *job) 158d60bafe5SSasha Levin { 159*f6a083e9SSasha Levin struct thread_pool__job *jobinfo = job; 160d60bafe5SSasha Levin 161d60bafe5SSasha Levin if (jobinfo == NULL) 162d60bafe5SSasha Levin return; 163d60bafe5SSasha Levin 164d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex); 165d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0) 166*f6a083e9SSasha Levin thread_pool__job_push_locked(job); 167d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex); 168d60bafe5SSasha Levin 169d60bafe5SSasha Levin pthread_cond_signal(&job_cond); 170d60bafe5SSasha Levin } 171