#include "jobqueue.h" typedef struct Job Job; struct Job { JobQueue * queue; ExecutorJob job; gpointer data; }; struct JobQueue { Executor * executor; GQueue * jobs; GMutex * mutex; gboolean in_executor;; int ref_count; }; static Job * job_new (JobQueue * queue, ExecutorJob func, gpointer data) { Job *job = g_new0 (Job, 1); job->job = func; job->queue = queue; job->data = data; return job; } static void job_queue_lock (JobQueue *queue) { g_mutex_lock (queue->mutex); } static void job_queue_unlock (JobQueue *queue) { g_mutex_unlock (queue->mutex); } static void delete_queue (JobQueue *queue) { g_assert (queue->ref_count == 0); g_queue_free (queue->jobs); g_mutex_free (queue->mutex); g_free (queue); } JobQueue * job_queue_new (Executor *executor) { JobQueue *queue = g_new0 (JobQueue, 1); queue->executor = executor; queue->mutex = g_mutex_new (); queue->in_executor = FALSE; queue->ref_count = 1; queue->jobs = g_queue_new (); return queue; } static void run_queue (gpointer data) { JobQueue *queue = data; gboolean do_free = FALSE; job_queue_lock (queue); g_assert (queue->in_executor == TRUE); while (!g_queue_is_empty (queue->jobs)) { Job *job = g_queue_pop_head (queue->jobs); job_queue_unlock (queue); job->job (job->data); g_free (job); job_queue_lock (queue); } queue->ref_count--; queue->in_executor = FALSE; if (queue->ref_count == 0) do_free = TRUE; job_queue_unlock (queue); if (do_free) delete_queue (queue); } gpointer job_queue_add (JobQueue *queue, ExecutorJob func, gpointer data) { Job *job; job_queue_lock (queue); job = job_new (queue, func, data); g_queue_push_tail (queue->jobs, job); if (!queue->in_executor) { queue->in_executor = TRUE; queue->ref_count++; executor_add_job (queue->executor, run_queue, queue); } job_queue_unlock (queue); return job; } void job_queue_remove (JobQueue *queue, gpointer job_id) { Job *job = job_id; GList *link; job_queue_lock (job->queue); link = g_queue_find (queue->jobs, job); if (link) { g_queue_delete_link (queue->jobs, link); g_free (job); } job_queue_unlock (job->queue); } void job_queue_free (JobQueue *queue) { gboolean do_free = FALSE; job_queue_lock (queue); if (queue->in_executor) { g_warning ("Trying to free a running queue"); job_queue_unlock (queue); return; } while (!g_queue_is_empty (queue->jobs)) { Job *job = g_queue_pop_head (queue->jobs); g_free (job); } g_queue_free (queue->jobs); queue->ref_count--; if (queue->ref_count == 0) do_free = TRUE; job_queue_unlock (queue); if (do_free) delete_queue (queue); }