1d60bafe5SSasha Levin #include "kvm/threadpool.h" 2d60bafe5SSasha Levin #include "kvm/mutex.h" 3d60bafe5SSasha Levin 4d60bafe5SSasha Levin #include <linux/kernel.h> 5d60bafe5SSasha Levin #include <linux/list.h> 6d60bafe5SSasha Levin #include <pthread.h> 7d60bafe5SSasha Levin #include <stdbool.h> 8d60bafe5SSasha Levin 9d60bafe5SSasha Levin static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 10d60bafe5SSasha Levin static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 11d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 12d60bafe5SSasha Levin 13d60bafe5SSasha Levin static LIST_HEAD(head); 14d60bafe5SSasha Levin 15d60bafe5SSasha Levin static pthread_t *threads; 16d60bafe5SSasha Levin static long threadcount; 17f6a3c571SSasha Levin static bool running; 18d60bafe5SSasha Levin 19a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void) 20d60bafe5SSasha Levin { 21f6a083e9SSasha Levin struct thread_pool__job *job; 22d60bafe5SSasha Levin 23d60bafe5SSasha Levin if (list_empty(&head)) 24d60bafe5SSasha Levin return NULL; 25d60bafe5SSasha Levin 26f6a083e9SSasha Levin job = list_first_entry(&head, struct thread_pool__job, queue); 27d60bafe5SSasha Levin list_del(&job->queue); 28d60bafe5SSasha Levin 29d60bafe5SSasha Levin return job; 30d60bafe5SSasha Levin } 31d60bafe5SSasha Levin 32a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job) 33d60bafe5SSasha Levin { 34d60bafe5SSasha Levin list_add_tail(&job->queue, &head); 35d60bafe5SSasha Levin } 36d60bafe5SSasha Levin 37a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void) 38d60bafe5SSasha Levin { 39f6a083e9SSasha Levin struct thread_pool__job *job; 40d60bafe5SSasha Levin 41d60bafe5SSasha Levin mutex_lock(&job_mutex); 42a4cae5abSLai Jiangshan job = thread_pool__job_pop_locked(); 43d60bafe5SSasha Levin mutex_unlock(&job_mutex); 44d60bafe5SSasha Levin return job; 45d60bafe5SSasha Levin } 46d60bafe5SSasha Levin 47a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job) 48d60bafe5SSasha Levin { 49d60bafe5SSasha Levin mutex_lock(&job_mutex); 50a4cae5abSLai Jiangshan thread_pool__job_push_locked(job); 51d60bafe5SSasha Levin mutex_unlock(&job_mutex); 52d60bafe5SSasha Levin } 53d60bafe5SSasha Levin 54f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job) 55d60bafe5SSasha Levin { 56d60bafe5SSasha Levin while (job) { 57d60bafe5SSasha Levin job->callback(job->kvm, job->data); 58d60bafe5SSasha Levin 59d60bafe5SSasha Levin mutex_lock(&job->mutex); 60d60bafe5SSasha Levin 61d60bafe5SSasha Levin if (--job->signalcount > 0) 62d60bafe5SSasha Levin /* If the job was signaled again while we were working */ 63a4cae5abSLai Jiangshan thread_pool__job_push(job); 64d60bafe5SSasha Levin 65d60bafe5SSasha Levin mutex_unlock(&job->mutex); 66d60bafe5SSasha Levin 67a4cae5abSLai Jiangshan job = thread_pool__job_pop(); 68d60bafe5SSasha Levin } 69d60bafe5SSasha Levin } 70d60bafe5SSasha Levin 71d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param) 72d60bafe5SSasha Levin { 73d60bafe5SSasha Levin mutex_unlock(&job_mutex); 74d60bafe5SSasha Levin } 75d60bafe5SSasha Levin 76d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param) 77d60bafe5SSasha Levin { 78d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 79d60bafe5SSasha Levin 80f6a3c571SSasha Levin while (running) { 81*feceecd7SSasha Levin struct thread_pool__job *curjob = NULL; 82d60bafe5SSasha Levin 83d60bafe5SSasha Levin mutex_lock(&job_mutex); 84f6a3c571SSasha Levin while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 85d60bafe5SSasha Levin pthread_cond_wait(&job_cond, &job_mutex); 86d60bafe5SSasha Levin mutex_unlock(&job_mutex); 87d60bafe5SSasha Levin 88f6a3c571SSasha Levin if (running) 89d60bafe5SSasha Levin thread_pool__handle_job(curjob); 90d60bafe5SSasha Levin } 91d60bafe5SSasha Levin 92d60bafe5SSasha Levin pthread_cleanup_pop(0); 93d60bafe5SSasha Levin 94d60bafe5SSasha Levin return NULL; 95d60bafe5SSasha Levin } 96d60bafe5SSasha Levin 97d60bafe5SSasha Levin static int thread_pool__addthread(void) 98d60bafe5SSasha Levin { 99d60bafe5SSasha Levin int res; 100d60bafe5SSasha Levin void *newthreads; 101d60bafe5SSasha Levin 102d60bafe5SSasha Levin mutex_lock(&thread_mutex); 103d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 104d60bafe5SSasha Levin if (newthreads == NULL) { 105d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 106d60bafe5SSasha Levin return -1; 107d60bafe5SSasha Levin } 108d60bafe5SSasha Levin 109d60bafe5SSasha Levin threads = newthreads; 110d60bafe5SSasha Levin 111d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL, 112d60bafe5SSasha Levin thread_pool__threadfunc, NULL); 113d60bafe5SSasha Levin 114d60bafe5SSasha Levin if (res == 0) 115d60bafe5SSasha Levin threadcount++; 116d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 117d60bafe5SSasha Levin 118d60bafe5SSasha Levin return res; 119d60bafe5SSasha Levin } 120d60bafe5SSasha Levin 121f6a3c571SSasha Levin int thread_pool__init(struct kvm *kvm) 122d60bafe5SSasha Levin { 123d60bafe5SSasha Levin unsigned long i; 124f6a3c571SSasha Levin unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 125f6a3c571SSasha Levin 126f6a3c571SSasha Levin running = true; 127d60bafe5SSasha Levin 128d60bafe5SSasha Levin for (i = 0; i < thread_count; i++) 129d60bafe5SSasha Levin if (thread_pool__addthread() < 0) 130d60bafe5SSasha Levin return i; 131d60bafe5SSasha Levin 132d60bafe5SSasha Levin return i; 133d60bafe5SSasha Levin } 134d60bafe5SSasha Levin 135f6a3c571SSasha Levin int thread_pool__exit(struct kvm *kvm) 136f6a3c571SSasha Levin { 137f6a3c571SSasha Levin int i; 138f6a3c571SSasha Levin void *NUL = NULL; 139f6a3c571SSasha Levin 140f6a3c571SSasha Levin running = false; 141f6a3c571SSasha Levin 142f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 143f6a3c571SSasha Levin mutex_lock(&job_mutex); 144f6a3c571SSasha Levin pthread_cond_signal(&job_cond); 145f6a3c571SSasha Levin mutex_unlock(&job_mutex); 146f6a3c571SSasha Levin } 147f6a3c571SSasha Levin 148f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 149f6a3c571SSasha Levin pthread_join(threads[i], NUL); 150f6a3c571SSasha Levin } 151f6a3c571SSasha Levin 152f6a3c571SSasha Levin return 0; 153f6a3c571SSasha Levin } 154f6a3c571SSasha Levin 155df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job) 156d60bafe5SSasha Levin { 157f6a083e9SSasha Levin struct thread_pool__job *jobinfo = job; 158d60bafe5SSasha Levin 159df0c7f57SSasha Levin if (jobinfo == NULL || jobinfo->callback == NULL) 160d60bafe5SSasha Levin return; 161d60bafe5SSasha Levin 162d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex); 163d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0) 164a4cae5abSLai Jiangshan thread_pool__job_push(job); 165d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex); 166d60bafe5SSasha Levin 1679449cd3cSSasha Levin mutex_lock(&job_mutex); 168d60bafe5SSasha Levin pthread_cond_signal(&job_cond); 1699449cd3cSSasha Levin mutex_unlock(&job_mutex); 170d60bafe5SSasha Levin } 171