summaryrefslogtreecommitdiff
path: root/executor.c
diff options
context:
space:
mode:
authorSøren Sandmann <sandmann@redhat.com>2009-08-30 21:50:51 -0400
committerSøren Sandmann <sandmann@redhat.com>2009-08-30 21:50:51 -0400
commite947945d98e54fbd72c7764418b6556c7df7c7cc (patch)
tree88854705a9aa8b78a65f130f71ad3b1a5b89031b /executor.c
Initial checkinHEADmaster
Diffstat (limited to 'executor.c')
-rw-r--r--executor.c178
1 files changed, 178 insertions, 0 deletions
diff --git a/executor.c b/executor.c
new file mode 100644
index 0000000..a70c248
--- /dev/null
+++ b/executor.c
@@ -0,0 +1,178 @@
+#include "executor.h"
+
+struct Executor
+{
+ gboolean quitting;
+
+ GMutex * mutex;
+
+ GCond * new_job_cond;
+ GCond * idle_cond;
+ GCond * exit_cond;
+
+ GQueue * jobs;
+
+ int n_threads;
+ int n_idle_threads;
+ int n_exited_threads;
+};
+
+static gpointer executor_thread (gpointer);
+
+static void
+executor_lock (Executor *executor)
+{
+ g_mutex_lock (executor->mutex);
+}
+
+static void
+executor_unlock (Executor *executor)
+{
+ g_mutex_unlock (executor->mutex);
+}
+
+Executor *
+executor_new (guint n_threads)
+{
+ Executor *executor;
+ int i;
+
+ g_return_val_if_fail (n_threads > 0, NULL);
+
+ if (!g_thread_supported())
+ g_thread_init (NULL);
+
+ executor = g_new0 (Executor, 1);
+
+ executor->mutex = g_mutex_new ();
+ executor->jobs = g_queue_new ();
+
+ executor->new_job_cond = g_cond_new ();
+ executor->idle_cond = g_cond_new ();
+ executor->exit_cond = g_cond_new ();
+ executor->n_idle_threads = 0;
+ executor->n_exited_threads = 0;
+ executor->quitting = FALSE;
+ executor->n_threads = n_threads;
+
+ executor_lock (executor);
+
+ for (i = 0; i < n_threads; ++i)
+ g_thread_create (executor_thread, executor, TRUE, NULL);
+
+ executor_unlock (executor);
+
+ return executor;
+}
+
+static guint
+executor_n_jobs (Executor *executor)
+{
+ g_assert (g_queue_get_length (executor->jobs) % 2 == 0);
+
+ return (g_queue_get_length (executor->jobs)) / 2;
+}
+
+/* Wait until all previously submitted jobs have finished */
+void
+executor_sync (Executor *executor)
+{
+ executor_lock (executor);
+
+ g_cond_broadcast (executor->new_job_cond);
+
+ while (!(executor->n_idle_threads == executor->n_threads &&
+ executor_n_jobs (executor) == 0))
+ {
+ g_cond_wait (executor->idle_cond, executor->mutex);
+ }
+
+ executor_unlock (executor);
+}
+
+void
+executor_free (Executor *executor)
+{
+ executor_lock (executor);
+
+ executor->quitting = TRUE;
+
+ g_cond_broadcast (executor->new_job_cond);
+
+ while (executor->n_exited_threads != executor->n_threads)
+ g_cond_wait (executor->exit_cond, executor->mutex);
+
+ executor_unlock (executor);
+
+ while (!g_queue_is_empty (executor->jobs))
+ g_queue_pop_head (executor->jobs);
+
+ g_mutex_free (executor->mutex);
+ g_cond_free (executor->new_job_cond);
+ g_cond_free (executor->idle_cond);
+ g_cond_free (executor->exit_cond);
+ g_queue_free (executor->jobs);
+}
+
+void
+executor_add_job (Executor *exe,
+ ExecutorJob job,
+ gpointer data)
+{
+ executor_lock (exe);
+
+ g_queue_push_tail (exe->jobs, job);
+ g_queue_push_tail (exe->jobs, data);
+
+ g_cond_signal (exe->new_job_cond);
+
+ executor_unlock (exe);
+}
+
+static gpointer
+executor_thread (gpointer data)
+{
+ Executor *executor = data;
+
+ executor_lock (executor);
+
+ while (!executor->quitting)
+ {
+ if (!executor_n_jobs (executor))
+ {
+ executor->n_idle_threads++;
+
+ g_cond_signal (executor->idle_cond);
+
+ g_cond_wait (executor->new_job_cond, executor->mutex);
+
+ executor->n_idle_threads--;
+ }
+
+ if (executor->quitting)
+ break;
+
+ if (executor_n_jobs (executor))
+ {
+ ExecutorJob job;
+ gpointer data;
+
+ job = g_queue_pop_head (executor->jobs);
+ data = g_queue_pop_head (executor->jobs);
+
+ executor_unlock (executor);
+
+ job (data);
+
+ executor_lock (executor);
+ }
+ }
+
+ executor->n_exited_threads++;
+
+ g_cond_signal (executor->exit_cond);
+
+ executor_unlock (executor);
+
+ return NULL;
+}