1d60bafe5SSasha Levin #include "kvm/threadpool.h" 2d60bafe5SSasha Levin #include "kvm/mutex.h" 3*49a8afd1SSasha Levin #include "kvm/kvm.h" 4d60bafe5SSasha Levin 5d60bafe5SSasha Levin #include <linux/kernel.h> 6d60bafe5SSasha Levin #include <linux/list.h> 7d60bafe5SSasha Levin #include <pthread.h> 8d60bafe5SSasha Levin #include <stdbool.h> 9d60bafe5SSasha Levin 10d60bafe5SSasha Levin static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; 11d60bafe5SSasha Levin static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; 12d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 13d60bafe5SSasha Levin 14d60bafe5SSasha Levin static LIST_HEAD(head); 15d60bafe5SSasha Levin 16d60bafe5SSasha Levin static pthread_t *threads; 17d60bafe5SSasha Levin static long threadcount; 18f6a3c571SSasha Levin static bool running; 19d60bafe5SSasha Levin 20a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void) 21d60bafe5SSasha Levin { 22f6a083e9SSasha Levin struct thread_pool__job *job; 23d60bafe5SSasha Levin 24d60bafe5SSasha Levin if (list_empty(&head)) 25d60bafe5SSasha Levin return NULL; 26d60bafe5SSasha Levin 27f6a083e9SSasha Levin job = list_first_entry(&head, struct thread_pool__job, queue); 28d60bafe5SSasha Levin list_del(&job->queue); 29d60bafe5SSasha Levin 30d60bafe5SSasha Levin return job; 31d60bafe5SSasha Levin } 32d60bafe5SSasha Levin 33a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job) 34d60bafe5SSasha Levin { 35d60bafe5SSasha Levin list_add_tail(&job->queue, &head); 36d60bafe5SSasha Levin } 37d60bafe5SSasha Levin 38a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void) 39d60bafe5SSasha Levin { 40f6a083e9SSasha Levin struct thread_pool__job *job; 41d60bafe5SSasha Levin 42d60bafe5SSasha Levin mutex_lock(&job_mutex); 43a4cae5abSLai Jiangshan job = thread_pool__job_pop_locked(); 44d60bafe5SSasha Levin mutex_unlock(&job_mutex); 45d60bafe5SSasha Levin return job; 46d60bafe5SSasha Levin } 47d60bafe5SSasha Levin 48a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job) 49d60bafe5SSasha Levin { 50d60bafe5SSasha Levin mutex_lock(&job_mutex); 51a4cae5abSLai Jiangshan thread_pool__job_push_locked(job); 52d60bafe5SSasha Levin mutex_unlock(&job_mutex); 53d60bafe5SSasha Levin } 54d60bafe5SSasha Levin 55f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job) 56d60bafe5SSasha Levin { 57d60bafe5SSasha Levin while (job) { 58d60bafe5SSasha Levin job->callback(job->kvm, job->data); 59d60bafe5SSasha Levin 60d60bafe5SSasha Levin mutex_lock(&job->mutex); 61d60bafe5SSasha Levin 62d60bafe5SSasha Levin if (--job->signalcount > 0) 63d60bafe5SSasha Levin /* If the job was signaled again while we were working */ 64a4cae5abSLai Jiangshan thread_pool__job_push(job); 65d60bafe5SSasha Levin 66d60bafe5SSasha Levin mutex_unlock(&job->mutex); 67d60bafe5SSasha Levin 68a4cae5abSLai Jiangshan job = thread_pool__job_pop(); 69d60bafe5SSasha Levin } 70d60bafe5SSasha Levin } 71d60bafe5SSasha Levin 72d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param) 73d60bafe5SSasha Levin { 74d60bafe5SSasha Levin mutex_unlock(&job_mutex); 75d60bafe5SSasha Levin } 76d60bafe5SSasha Levin 77d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param) 78d60bafe5SSasha Levin { 79d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 80d60bafe5SSasha Levin 81f6a3c571SSasha Levin while (running) { 82feceecd7SSasha Levin struct thread_pool__job *curjob = NULL; 83d60bafe5SSasha Levin 84d60bafe5SSasha Levin mutex_lock(&job_mutex); 85f6a3c571SSasha Levin while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 86d60bafe5SSasha Levin pthread_cond_wait(&job_cond, &job_mutex); 87d60bafe5SSasha Levin mutex_unlock(&job_mutex); 88d60bafe5SSasha Levin 89f6a3c571SSasha Levin if (running) 90d60bafe5SSasha Levin thread_pool__handle_job(curjob); 91d60bafe5SSasha Levin } 92d60bafe5SSasha Levin 93d60bafe5SSasha Levin pthread_cleanup_pop(0); 94d60bafe5SSasha Levin 95d60bafe5SSasha Levin return NULL; 96d60bafe5SSasha Levin } 97d60bafe5SSasha Levin 98d60bafe5SSasha Levin static int thread_pool__addthread(void) 99d60bafe5SSasha Levin { 100d60bafe5SSasha Levin int res; 101d60bafe5SSasha Levin void *newthreads; 102d60bafe5SSasha Levin 103d60bafe5SSasha Levin mutex_lock(&thread_mutex); 104d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 105d60bafe5SSasha Levin if (newthreads == NULL) { 106d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 107d60bafe5SSasha Levin return -1; 108d60bafe5SSasha Levin } 109d60bafe5SSasha Levin 110d60bafe5SSasha Levin threads = newthreads; 111d60bafe5SSasha Levin 112d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL, 113d60bafe5SSasha Levin thread_pool__threadfunc, NULL); 114d60bafe5SSasha Levin 115d60bafe5SSasha Levin if (res == 0) 116d60bafe5SSasha Levin threadcount++; 117d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 118d60bafe5SSasha Levin 119d60bafe5SSasha Levin return res; 120d60bafe5SSasha Levin } 121d60bafe5SSasha Levin 122f6a3c571SSasha Levin int thread_pool__init(struct kvm *kvm) 123d60bafe5SSasha Levin { 124d60bafe5SSasha Levin unsigned long i; 125f6a3c571SSasha Levin unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 126f6a3c571SSasha Levin 127f6a3c571SSasha Levin running = true; 128d60bafe5SSasha Levin 129d60bafe5SSasha Levin for (i = 0; i < thread_count; i++) 130d60bafe5SSasha Levin if (thread_pool__addthread() < 0) 131d60bafe5SSasha Levin return i; 132d60bafe5SSasha Levin 133d60bafe5SSasha Levin return i; 134d60bafe5SSasha Levin } 135*49a8afd1SSasha Levin late_init(thread_pool__init); 136d60bafe5SSasha Levin 137f6a3c571SSasha Levin int thread_pool__exit(struct kvm *kvm) 138f6a3c571SSasha Levin { 139f6a3c571SSasha Levin int i; 140f6a3c571SSasha Levin void *NUL = NULL; 141f6a3c571SSasha Levin 142f6a3c571SSasha Levin running = false; 143f6a3c571SSasha Levin 144f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 145f6a3c571SSasha Levin mutex_lock(&job_mutex); 146f6a3c571SSasha Levin pthread_cond_signal(&job_cond); 147f6a3c571SSasha Levin mutex_unlock(&job_mutex); 148f6a3c571SSasha Levin } 149f6a3c571SSasha Levin 150f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 151f6a3c571SSasha Levin pthread_join(threads[i], NUL); 152f6a3c571SSasha Levin } 153f6a3c571SSasha Levin 154f6a3c571SSasha Levin return 0; 155f6a3c571SSasha Levin } 156*49a8afd1SSasha Levin late_exit(thread_pool__exit); 157f6a3c571SSasha Levin 158df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job) 159d60bafe5SSasha Levin { 160f6a083e9SSasha Levin struct thread_pool__job *jobinfo = job; 161d60bafe5SSasha Levin 162df0c7f57SSasha Levin if (jobinfo == NULL || jobinfo->callback == NULL) 163d60bafe5SSasha Levin return; 164d60bafe5SSasha Levin 165d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex); 166d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0) 167a4cae5abSLai Jiangshan thread_pool__job_push(job); 168d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex); 169d60bafe5SSasha Levin 1709449cd3cSSasha Levin mutex_lock(&job_mutex); 171d60bafe5SSasha Levin pthread_cond_signal(&job_cond); 1729449cd3cSSasha Levin mutex_unlock(&job_mutex); 173d60bafe5SSasha Levin } 174