#include "executor.h" struct Executor { gboolean quitting; GMutex * mutex; GCond * new_job_cond; GCond * idle_cond; GCond * exit_cond; GQueue * jobs; int n_threads; int n_idle_threads; int n_exited_threads; }; static gpointer executor_thread (gpointer); static void executor_lock (Executor *executor) { g_mutex_lock (executor->mutex); } static void executor_unlock (Executor *executor) { g_mutex_unlock (executor->mutex); } Executor * executor_new (guint n_threads) { Executor *executor; int i; g_return_val_if_fail (n_threads > 0, NULL); if (!g_thread_supported()) g_thread_init (NULL); executor = g_new0 (Executor, 1); executor->mutex = g_mutex_new (); executor->jobs = g_queue_new (); executor->new_job_cond = g_cond_new (); executor->idle_cond = g_cond_new (); executor->exit_cond = g_cond_new (); executor->n_idle_threads = 0; executor->n_exited_threads = 0; executor->quitting = FALSE; executor->n_threads = n_threads; executor_lock (executor); for (i = 0; i < n_threads; ++i) g_thread_create (executor_thread, executor, TRUE, NULL); executor_unlock (executor); return executor; } static guint executor_n_jobs (Executor *executor) { g_assert (g_queue_get_length (executor->jobs) % 2 == 0); return (g_queue_get_length (executor->jobs)) / 2; } /* Wait until all previously submitted jobs have finished */ void executor_sync (Executor *executor) { executor_lock (executor); g_cond_broadcast (executor->new_job_cond); while (!(executor->n_idle_threads == executor->n_threads && executor_n_jobs (executor) == 0)) { g_cond_wait (executor->idle_cond, executor->mutex); } executor_unlock (executor); } void executor_free (Executor *executor) { executor_lock (executor); executor->quitting = TRUE; g_cond_broadcast (executor->new_job_cond); while (executor->n_exited_threads != executor->n_threads) g_cond_wait (executor->exit_cond, executor->mutex); executor_unlock (executor); while (!g_queue_is_empty (executor->jobs)) g_queue_pop_head (executor->jobs); g_mutex_free (executor->mutex); g_cond_free (executor->new_job_cond); g_cond_free (executor->idle_cond); g_cond_free (executor->exit_cond); g_queue_free (executor->jobs); } void executor_add_job (Executor *exe, ExecutorJob job, gpointer data) { executor_lock (exe); g_queue_push_tail (exe->jobs, job); g_queue_push_tail (exe->jobs, data); g_cond_signal (exe->new_job_cond); executor_unlock (exe); } static gpointer executor_thread (gpointer data) { Executor *executor = data; executor_lock (executor); while (!executor->quitting) { if (!executor_n_jobs (executor)) { executor->n_idle_threads++; g_cond_signal (executor->idle_cond); g_cond_wait (executor->new_job_cond, executor->mutex); executor->n_idle_threads--; } if (executor->quitting) break; if (executor_n_jobs (executor)) { ExecutorJob job; gpointer data; job = g_queue_pop_head (executor->jobs); data = g_queue_pop_head (executor->jobs); executor_unlock (executor); job (data); executor_lock (executor); } } executor->n_exited_threads++; g_cond_signal (executor->exit_cond); executor_unlock (executor); return NULL; }