1d60bafe5SSasha Levin #include "kvm/threadpool.h" 2d60bafe5SSasha Levin #include "kvm/mutex.h" 349a8afd1SSasha Levin #include "kvm/kvm.h" 4d60bafe5SSasha Levin 5d60bafe5SSasha Levin #include <linux/kernel.h> 6d60bafe5SSasha Levin #include <linux/list.h> 7d60bafe5SSasha Levin #include <pthread.h> 8d60bafe5SSasha Levin #include <stdbool.h> 9d60bafe5SSasha Levin 10*d3476f7dSSasha Levin static DEFINE_MUTEX(job_mutex); 11*d3476f7dSSasha Levin static DEFINE_MUTEX(thread_mutex); 12d60bafe5SSasha Levin static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; 13d60bafe5SSasha Levin 14d60bafe5SSasha Levin static LIST_HEAD(head); 15d60bafe5SSasha Levin 16d60bafe5SSasha Levin static pthread_t *threads; 17d60bafe5SSasha Levin static long threadcount; 18f6a3c571SSasha Levin static bool running; 19d60bafe5SSasha Levin 20a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop_locked(void) 21d60bafe5SSasha Levin { 22f6a083e9SSasha Levin struct thread_pool__job *job; 23d60bafe5SSasha Levin 24d60bafe5SSasha Levin if (list_empty(&head)) 25d60bafe5SSasha Levin return NULL; 26d60bafe5SSasha Levin 27f6a083e9SSasha Levin job = list_first_entry(&head, struct thread_pool__job, queue); 28d60bafe5SSasha Levin list_del(&job->queue); 29d60bafe5SSasha Levin 30d60bafe5SSasha Levin return job; 31d60bafe5SSasha Levin } 32d60bafe5SSasha Levin 33a4cae5abSLai Jiangshan static void thread_pool__job_push_locked(struct thread_pool__job *job) 34d60bafe5SSasha Levin { 35d60bafe5SSasha Levin list_add_tail(&job->queue, &head); 36d60bafe5SSasha Levin } 37d60bafe5SSasha Levin 38a4cae5abSLai Jiangshan static struct thread_pool__job *thread_pool__job_pop(void) 39d60bafe5SSasha Levin { 40f6a083e9SSasha Levin struct thread_pool__job *job; 41d60bafe5SSasha Levin 42d60bafe5SSasha Levin mutex_lock(&job_mutex); 43a4cae5abSLai Jiangshan job = thread_pool__job_pop_locked(); 44d60bafe5SSasha Levin mutex_unlock(&job_mutex); 45d60bafe5SSasha Levin return job; 46d60bafe5SSasha Levin } 47d60bafe5SSasha Levin 48a4cae5abSLai Jiangshan static void thread_pool__job_push(struct thread_pool__job *job) 49d60bafe5SSasha Levin { 50d60bafe5SSasha Levin mutex_lock(&job_mutex); 51a4cae5abSLai Jiangshan thread_pool__job_push_locked(job); 52d60bafe5SSasha Levin mutex_unlock(&job_mutex); 53d60bafe5SSasha Levin } 54d60bafe5SSasha Levin 55f6a083e9SSasha Levin static void thread_pool__handle_job(struct thread_pool__job *job) 56d60bafe5SSasha Levin { 57d60bafe5SSasha Levin while (job) { 58d60bafe5SSasha Levin job->callback(job->kvm, job->data); 59d60bafe5SSasha Levin 60d60bafe5SSasha Levin mutex_lock(&job->mutex); 61d60bafe5SSasha Levin 62d60bafe5SSasha Levin if (--job->signalcount > 0) 63d60bafe5SSasha Levin /* If the job was signaled again while we were working */ 64a4cae5abSLai Jiangshan thread_pool__job_push(job); 65d60bafe5SSasha Levin 66d60bafe5SSasha Levin mutex_unlock(&job->mutex); 67d60bafe5SSasha Levin 68a4cae5abSLai Jiangshan job = thread_pool__job_pop(); 69d60bafe5SSasha Levin } 70d60bafe5SSasha Levin } 71d60bafe5SSasha Levin 72d60bafe5SSasha Levin static void thread_pool__threadfunc_cleanup(void *param) 73d60bafe5SSasha Levin { 74d60bafe5SSasha Levin mutex_unlock(&job_mutex); 75d60bafe5SSasha Levin } 76d60bafe5SSasha Levin 77d60bafe5SSasha Levin static void *thread_pool__threadfunc(void *param) 78d60bafe5SSasha Levin { 79d60bafe5SSasha Levin pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); 80d60bafe5SSasha Levin 81a4d8c55eSSasha Levin kvm__set_thread_name("threadpool-worker"); 82a4d8c55eSSasha Levin 83f6a3c571SSasha Levin while (running) { 84feceecd7SSasha Levin struct thread_pool__job *curjob = NULL; 85d60bafe5SSasha Levin 86d60bafe5SSasha Levin mutex_lock(&job_mutex); 87f6a3c571SSasha Levin while (running && (curjob = thread_pool__job_pop_locked()) == NULL) 88*d3476f7dSSasha Levin pthread_cond_wait(&job_cond, &job_mutex.mutex); 89d60bafe5SSasha Levin mutex_unlock(&job_mutex); 90d60bafe5SSasha Levin 91f6a3c571SSasha Levin if (running) 92d60bafe5SSasha Levin thread_pool__handle_job(curjob); 93d60bafe5SSasha Levin } 94d60bafe5SSasha Levin 95d60bafe5SSasha Levin pthread_cleanup_pop(0); 96d60bafe5SSasha Levin 97d60bafe5SSasha Levin return NULL; 98d60bafe5SSasha Levin } 99d60bafe5SSasha Levin 100d60bafe5SSasha Levin static int thread_pool__addthread(void) 101d60bafe5SSasha Levin { 102d60bafe5SSasha Levin int res; 103d60bafe5SSasha Levin void *newthreads; 104d60bafe5SSasha Levin 105d60bafe5SSasha Levin mutex_lock(&thread_mutex); 106d60bafe5SSasha Levin newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); 107d60bafe5SSasha Levin if (newthreads == NULL) { 108d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 109d60bafe5SSasha Levin return -1; 110d60bafe5SSasha Levin } 111d60bafe5SSasha Levin 112d60bafe5SSasha Levin threads = newthreads; 113d60bafe5SSasha Levin 114d60bafe5SSasha Levin res = pthread_create(threads + threadcount, NULL, 115d60bafe5SSasha Levin thread_pool__threadfunc, NULL); 116d60bafe5SSasha Levin 117d60bafe5SSasha Levin if (res == 0) 118d60bafe5SSasha Levin threadcount++; 119d60bafe5SSasha Levin mutex_unlock(&thread_mutex); 120d60bafe5SSasha Levin 121d60bafe5SSasha Levin return res; 122d60bafe5SSasha Levin } 123d60bafe5SSasha Levin 124f6a3c571SSasha Levin int thread_pool__init(struct kvm *kvm) 125d60bafe5SSasha Levin { 126d60bafe5SSasha Levin unsigned long i; 127f6a3c571SSasha Levin unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); 128f6a3c571SSasha Levin 129f6a3c571SSasha Levin running = true; 130d60bafe5SSasha Levin 131d60bafe5SSasha Levin for (i = 0; i < thread_count; i++) 132d60bafe5SSasha Levin if (thread_pool__addthread() < 0) 133d60bafe5SSasha Levin return i; 134d60bafe5SSasha Levin 135d60bafe5SSasha Levin return i; 136d60bafe5SSasha Levin } 13749a8afd1SSasha Levin late_init(thread_pool__init); 138d60bafe5SSasha Levin 139f6a3c571SSasha Levin int thread_pool__exit(struct kvm *kvm) 140f6a3c571SSasha Levin { 141f6a3c571SSasha Levin int i; 142f6a3c571SSasha Levin void *NUL = NULL; 143f6a3c571SSasha Levin 144f6a3c571SSasha Levin running = false; 145f6a3c571SSasha Levin 146f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 147f6a3c571SSasha Levin mutex_lock(&job_mutex); 148f6a3c571SSasha Levin pthread_cond_signal(&job_cond); 149f6a3c571SSasha Levin mutex_unlock(&job_mutex); 150f6a3c571SSasha Levin } 151f6a3c571SSasha Levin 152f6a3c571SSasha Levin for (i = 0; i < threadcount; i++) { 153f6a3c571SSasha Levin pthread_join(threads[i], NUL); 154f6a3c571SSasha Levin } 155f6a3c571SSasha Levin 156f6a3c571SSasha Levin return 0; 157f6a3c571SSasha Levin } 15849a8afd1SSasha Levin late_exit(thread_pool__exit); 159f6a3c571SSasha Levin 160df0c7f57SSasha Levin void thread_pool__do_job(struct thread_pool__job *job) 161d60bafe5SSasha Levin { 162f6a083e9SSasha Levin struct thread_pool__job *jobinfo = job; 163d60bafe5SSasha Levin 164df0c7f57SSasha Levin if (jobinfo == NULL || jobinfo->callback == NULL) 165d60bafe5SSasha Levin return; 166d60bafe5SSasha Levin 167d60bafe5SSasha Levin mutex_lock(&jobinfo->mutex); 168d60bafe5SSasha Levin if (jobinfo->signalcount++ == 0) 169a4cae5abSLai Jiangshan thread_pool__job_push(job); 170d60bafe5SSasha Levin mutex_unlock(&jobinfo->mutex); 171d60bafe5SSasha Levin 1729449cd3cSSasha Levin mutex_lock(&job_mutex); 173d60bafe5SSasha Levin pthread_cond_signal(&job_cond); 1749449cd3cSSasha Levin mutex_unlock(&job_mutex); 175d60bafe5SSasha Levin } 176