diff options
author | Søren Sandmann <sandmann@redhat.com> | 2009-08-30 21:50:51 -0400 |
---|---|---|
committer | Søren Sandmann <sandmann@redhat.com> | 2009-08-30 21:50:51 -0400 |
commit | e947945d98e54fbd72c7764418b6556c7df7c7cc (patch) | |
tree | 88854705a9aa8b78a65f130f71ad3b1a5b89031b /executor.c |
Diffstat (limited to 'executor.c')
-rw-r--r-- | executor.c | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/executor.c b/executor.c new file mode 100644 index 0000000..a70c248 --- /dev/null +++ b/executor.c @@ -0,0 +1,178 @@ +#include "executor.h" + +struct Executor +{ + gboolean quitting; + + GMutex * mutex; + + GCond * new_job_cond; + GCond * idle_cond; + GCond * exit_cond; + + GQueue * jobs; + + int n_threads; + int n_idle_threads; + int n_exited_threads; +}; + +static gpointer executor_thread (gpointer); + +static void +executor_lock (Executor *executor) +{ + g_mutex_lock (executor->mutex); +} + +static void +executor_unlock (Executor *executor) +{ + g_mutex_unlock (executor->mutex); +} + +Executor * +executor_new (guint n_threads) +{ + Executor *executor; + int i; + + g_return_val_if_fail (n_threads > 0, NULL); + + if (!g_thread_supported()) + g_thread_init (NULL); + + executor = g_new0 (Executor, 1); + + executor->mutex = g_mutex_new (); + executor->jobs = g_queue_new (); + + executor->new_job_cond = g_cond_new (); + executor->idle_cond = g_cond_new (); + executor->exit_cond = g_cond_new (); + executor->n_idle_threads = 0; + executor->n_exited_threads = 0; + executor->quitting = FALSE; + executor->n_threads = n_threads; + + executor_lock (executor); + + for (i = 0; i < n_threads; ++i) + g_thread_create (executor_thread, executor, TRUE, NULL); + + executor_unlock (executor); + + return executor; +} + +static guint +executor_n_jobs (Executor *executor) +{ + g_assert (g_queue_get_length (executor->jobs) % 2 == 0); + + return (g_queue_get_length (executor->jobs)) / 2; +} + +/* Wait until all previously submitted jobs have finished */ +void +executor_sync (Executor *executor) +{ + executor_lock (executor); + + g_cond_broadcast (executor->new_job_cond); + + while (!(executor->n_idle_threads == executor->n_threads && + executor_n_jobs (executor) == 0)) + { + g_cond_wait (executor->idle_cond, executor->mutex); + } + + executor_unlock (executor); +} + +void +executor_free (Executor *executor) +{ + executor_lock (executor); + + executor->quitting = TRUE; + + g_cond_broadcast (executor->new_job_cond); + + while (executor->n_exited_threads != executor->n_threads) + g_cond_wait (executor->exit_cond, executor->mutex); + + executor_unlock (executor); + + while (!g_queue_is_empty (executor->jobs)) + g_queue_pop_head (executor->jobs); + + g_mutex_free (executor->mutex); + g_cond_free (executor->new_job_cond); + g_cond_free (executor->idle_cond); + g_cond_free (executor->exit_cond); + g_queue_free (executor->jobs); +} + +void +executor_add_job (Executor *exe, + ExecutorJob job, + gpointer data) +{ + executor_lock (exe); + + g_queue_push_tail (exe->jobs, job); + g_queue_push_tail (exe->jobs, data); + + g_cond_signal (exe->new_job_cond); + + executor_unlock (exe); +} + +static gpointer +executor_thread (gpointer data) +{ + Executor *executor = data; + + executor_lock (executor); + + while (!executor->quitting) + { + if (!executor_n_jobs (executor)) + { + executor->n_idle_threads++; + + g_cond_signal (executor->idle_cond); + + g_cond_wait (executor->new_job_cond, executor->mutex); + + executor->n_idle_threads--; + } + + if (executor->quitting) + break; + + if (executor_n_jobs (executor)) + { + ExecutorJob job; + gpointer data; + + job = g_queue_pop_head (executor->jobs); + data = g_queue_pop_head (executor->jobs); + + executor_unlock (executor); + + job (data); + + executor_lock (executor); + } + } + + executor->n_exited_threads++; + + g_cond_signal (executor->exit_cond); + + executor_unlock (executor); + + return NULL; +} |