diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2015-03-17 18:31:15 +0100 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2015-03-18 17:21:45 +0100 |
commit | 5157cb91ae18d05de99badb23d76092aada645ec (patch) | |
tree | f62fb38324a5770ca64711e20af890246d1b9bca | |
parent | f52f442bc2aa31ecb87b4d359a162834ea0c77eb (diff) |
task: Implement scheduleable tasks
This allows us the have a custom thread pool with a limited number of threads
used for a set of elements. E.g. it allows to run 10 queues with only 2
threads.
Also implement support for handling that inside the queue element.
And implement a simple test application using queues.
-rw-r--r-- | gst/gsttask.c | 142 | ||||
-rw-r--r-- | gst/gsttask.h | 6 | ||||
-rw-r--r-- | gst/gsttaskpool.c | 132 | ||||
-rw-r--r-- | gst/gsttaskpool.h | 7 | ||||
-rw-r--r-- | plugins/elements/gstqueue.c | 103 | ||||
-rw-r--r-- | plugins/elements/gstqueue.h | 3 | ||||
-rw-r--r-- | tests/examples/streams/.gitignore | 1 | ||||
-rw-r--r-- | tests/examples/streams/Makefile.am | 6 | ||||
-rw-r--r-- | tests/examples/streams/schedule-tasks.c | 179 |
9 files changed, 526 insertions, 53 deletions
diff --git a/gst/gsttask.c b/gst/gsttask.c index b0b96e30c..68b46e138 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -1,6 +1,7 @@ /* GStreamer * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> * 2005 Wim Taymans <wim@fluendo.com> + * 2015 Sebastian Dröge <sebastian@centricular.com> * * gsttask.c: Streaming tasks * @@ -105,6 +106,23 @@ struct _GstTaskPrivate /* configured pool */ GstTaskPool *pool; + /* If the creator of the task wants to schedule / + * unschedule the task instead of having it run + * all the time. Once this is TRUE, the pool can't + * be changed anymore! + * + * This can only be TRUE if the pool is not the + * default pool because we want explicit usage + * of this from the application by providing a + * custom task pool + */ + gboolean scheduleable; + /* if the task should be scheduled again on the + * next opportunity. Defaults to TRUE until + * unschedule is called + */ + gboolean should_schedule; + /* remember the pool and id that is currently running. */ gpointer id; GstTaskPool *pool_id; @@ -143,6 +161,7 @@ SetThreadName (DWORD dwThreadID, LPCSTR szThreadName) static void gst_task_finalize (GObject * object); static void gst_task_func (GstTask * task); +static gboolean start_task (GstTask * task); #define _do_init \ { \ @@ -181,6 +200,9 @@ gst_task_init (GstTask * task) task->priv->pool = gst_object_ref (klass->pool); + task->priv->scheduleable = FALSE; + task->priv->should_schedule = TRUE; + /* clear floating flag */ gst_object_ref_sink (task); } @@ -270,7 +292,8 @@ gst_task_func (GstTask * task) * mark our state running so that nobody can mess with * the mutex. */ GST_OBJECT_LOCK (task); - if (GET_TASK_STATE (task) == GST_TASK_STOPPED) + if (GET_TASK_STATE (task) == GST_TASK_STOPPED || + (priv->scheduleable && GET_TASK_STATE (task) == GST_TASK_PAUSED)) goto exit; lock = GST_TASK_GET_LOCK (task); if (G_UNLIKELY (lock == NULL)) @@ -279,7 +302,7 @@ gst_task_func (GstTask * task) GST_OBJECT_UNLOCK (task); /* fire the enter_func callback when we need to */ - if (priv->enter_func) + if (!priv->scheduleable && priv->enter_func) priv->enter_func (task, tself, priv->enter_user_data); /* locking order is TASK_LOCK, LOCK */ @@ -289,7 +312,15 @@ gst_task_func (GstTask * task) while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) { GST_OBJECT_LOCK (task); - while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) { + + if (G_UNLIKELY (priv->scheduleable + && GST_TASK_STATE (task) == GST_TASK_PAUSED)) { + GST_OBJECT_UNLOCK (task); + break; + } + + while (G_UNLIKELY (!priv->scheduleable + && GST_TASK_STATE (task) == GST_TASK_PAUSED)) { g_rec_mutex_unlock (lock); GST_TASK_SIGNAL (task); @@ -310,6 +341,9 @@ gst_task_func (GstTask * task) } task->func (task->user_data); + + if (priv->scheduleable) + break; } g_rec_mutex_unlock (lock); @@ -318,21 +352,27 @@ gst_task_func (GstTask * task) task->thread = NULL; exit: - if (priv->leave_func) { + if (!priv->scheduleable && priv->leave_func) { /* fire the leave_func callback when we need to. We need to do this before * we signal the task and with the task lock released. */ GST_OBJECT_UNLOCK (task); priv->leave_func (task, tself, priv->leave_user_data); GST_OBJECT_LOCK (task); } - /* now we allow messing with the lock again by setting the running flag to - * %FALSE. Together with the SIGNAL this is the sign for the _join() to - * complete. - * Note that we still have not dropped the final ref on the task. We could - * check here if there is a pending join() going on and drop the last ref - * before releasing the lock as we can be sure that a ref is held by the - * caller of the join(). */ - task->running = FALSE; + + if (priv->scheduleable && priv->should_schedule + && GET_TASK_STATE (task) == GST_TASK_STARTED) { + start_task (task); + } else { + /* now we allow messing with the lock again by setting the running flag to + * %FALSE. Together with the SIGNAL this is the sign for the _join() to + * complete. + * Note that we still have not dropped the final ref on the task. We could + * check here if there is a pending join() going on and drop the last ref + * before releasing the lock as we can be sure that a ref is held by the + * caller of the join(). */ + task->running = FALSE; + } GST_TASK_SIGNAL (task); GST_OBJECT_UNLOCK (task); @@ -481,7 +521,7 @@ gst_task_get_pool (GstTask * task) void gst_task_set_pool (GstTask * task, GstTaskPool * pool) { - GstTaskPool *old; + GstTaskPool *old = NULL; GstTaskPrivate *priv; g_return_if_fail (GST_IS_TASK (task)); @@ -491,10 +531,14 @@ gst_task_set_pool (GstTask * task, GstTaskPool * pool) GST_OBJECT_LOCK (task); if (priv->pool != pool) { - old = priv->pool; - priv->pool = gst_object_ref (pool); - } else - old = NULL; + if (priv->scheduleable) { + g_warning ("%s: Changing pool not possible for scheduleable tasks", + GST_OBJECT_NAME (task)); + } else { + old = priv->pool; + priv->pool = gst_object_ref (pool); + } + } GST_OBJECT_UNLOCK (task); if (old) @@ -610,6 +654,8 @@ start_task (GstTask * task) priv = task->priv; + GST_DEBUG_OBJECT (task, "Starting task"); + /* new task, We ref before so that it remains alive while * the thread is running. */ gst_object_ref (task); @@ -617,6 +663,8 @@ start_task (GstTask * task) * and exit the task function. */ task->running = TRUE; + if (priv->pool_id) + gst_object_unref (priv->pool_id); /* push on the thread pool, we remember the original pool because the user * could change it later on and then we join to the wrong pool. */ priv->pool_id = gst_object_ref (priv->pool); @@ -667,11 +715,14 @@ gst_task_set_state (GstTask * task, GstTaskState state) old = GET_TASK_STATE (task); if (old != state) { SET_TASK_STATE (task, state); + switch (old) { case GST_TASK_STOPPED: /* If the task already has a thread scheduled we don't have to do * anything. */ - if (G_UNLIKELY (!task->running)) + if (G_UNLIKELY (!task->running) && + (!task->priv->scheduleable || (task->priv->should_schedule + && state == GST_TASK_STARTED))) res = start_task (task); break; case GST_TASK_PAUSED: @@ -829,3 +880,58 @@ joining_self: return FALSE; } } + +gboolean +gst_task_get_scheduleable (GstTask * task) +{ + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + return task->priv->scheduleable; +} + +gboolean +gst_task_set_scheduleable (GstTask * task, gboolean scheduleable) +{ + GstTaskClass *klass; + + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + klass = GST_TASK_GET_CLASS (task); + GST_OBJECT_LOCK (task); + if (task->priv->pool == klass->pool) { + GST_OBJECT_UNLOCK (task); + return FALSE; + } + + task->priv->scheduleable = scheduleable; + GST_OBJECT_UNLOCK (task); + + return scheduleable; +} + +void +gst_task_schedule (GstTask * task) +{ + g_return_if_fail (GST_IS_TASK (task)); + + GST_OBJECT_LOCK (task); + GST_DEBUG_OBJECT (task, "Scheduling task"); + if (!task->running && task->priv->scheduleable + && GET_TASK_STATE (task) == GST_TASK_STARTED) { + GST_DEBUG_OBJECT (task, "Task needs to be scheduled"); + task->priv->should_schedule = TRUE; + start_task (task); + } + GST_OBJECT_UNLOCK (task); +} + +void +gst_task_unschedule (GstTask * task) +{ + g_return_if_fail (GST_IS_TASK (task)); + + GST_OBJECT_LOCK (task); + GST_DEBUG_OBJECT (task, "Unscheduling task"); + task->priv->should_schedule = FALSE; + GST_OBJECT_UNLOCK (task); +} diff --git a/gst/gsttask.h b/gst/gsttask.h index 85117fff3..13ec924db 100644 --- a/gst/gsttask.h +++ b/gst/gsttask.h @@ -194,6 +194,12 @@ gboolean gst_task_pause (GstTask *task); gboolean gst_task_join (GstTask *task); +gboolean gst_task_get_scheduleable(GstTask *task); +gboolean gst_task_set_scheduleable(GstTask *task, gboolean scheduleable); + +void gst_task_schedule (GstTask *task); +void gst_task_unschedule (GstTask *task); + G_END_DECLS #endif /* __GST_TASK_H__ */ diff --git a/gst/gsttaskpool.c b/gst/gsttaskpool.c index cbbe1b7ef..347c2ec5e 100644 --- a/gst/gsttaskpool.c +++ b/gst/gsttaskpool.c @@ -1,5 +1,6 @@ /* GStreamer * Copyright (C) 2009 Wim Taymans <wim.taymans@gmail.com> + * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com> * * gsttaskpool.c: Pool for streaming threads * @@ -38,15 +39,26 @@ GST_DEBUG_CATEGORY_STATIC (taskpool_debug); #define GST_CAT_DEFAULT (taskpool_debug) -#ifndef GST_DISABLE_GST_DEBUG +struct _GstTaskPoolPrivate +{ + gint need_schedule_thread; + GMainContext *schedule_context; + GMainLoop *schedule_loop; + GThread *schedule_thread; + GMutex schedule_lock; + GCond schedule_cond; +}; + static void gst_task_pool_finalize (GObject * object); -#endif #define _do_init \ { \ GST_DEBUG_CATEGORY_INIT (taskpool_debug, "taskpool", 0, "Thread pool"); \ } +#define GST_TASK_POOL_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK_POOL, GstTaskPoolPrivate)) + G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init); typedef struct @@ -126,9 +138,9 @@ gst_task_pool_class_init (GstTaskPoolClass * klass) gobject_class = (GObjectClass *) klass; gsttaskpool_class = (GstTaskPoolClass *) klass; -#ifndef GST_DISABLE_GST_DEBUG + g_type_class_add_private (gobject_class, sizeof (GstTaskPoolPrivate)); + gobject_class->finalize = gst_task_pool_finalize; -#endif gsttaskpool_class->prepare = default_prepare; gsttaskpool_class->cleanup = default_cleanup; @@ -139,19 +151,32 @@ gst_task_pool_class_init (GstTaskPoolClass * klass) static void gst_task_pool_init (GstTaskPool * pool) { + pool->priv = GST_TASK_POOL_GET_PRIVATE (pool); + + pool->priv->need_schedule_thread = 0; + pool->priv->schedule_context = NULL; + pool->priv->schedule_loop = NULL; + pool->priv->schedule_thread = NULL; + g_mutex_init (&pool->priv->schedule_lock); + g_cond_init (&pool->priv->schedule_cond); + /* clear floating flag */ gst_object_ref_sink (pool); } -#ifndef GST_DISABLE_GST_DEBUG static void gst_task_pool_finalize (GObject * object) { - GST_DEBUG ("taskpool %p finalize", object); + GstTaskPool *pool = GST_TASK_POOL (object); + + GST_DEBUG_OBJECT (pool, "taskpool finalize"); + + g_mutex_clear (&pool->priv->schedule_lock); + g_cond_clear (&pool->priv->schedule_cond); G_OBJECT_CLASS (gst_task_pool_parent_class)->finalize (object); } -#endif + /** * gst_task_pool_new: * @@ -271,6 +296,99 @@ gst_task_pool_join (GstTaskPool * pool, gpointer id) klass->join (pool, id); } +static gboolean +main_loop_running_cb (gpointer user_data) +{ + GstTaskPool *pool = GST_TASK_POOL (user_data); + + g_mutex_lock (&pool->priv->schedule_lock); + g_cond_signal (&pool->priv->schedule_cond); + g_mutex_unlock (&pool->priv->schedule_lock); + + return G_SOURCE_REMOVE; +} + +static gpointer +gst_task_pool_schedule_func (gpointer data) +{ + GstTaskPool *pool = GST_TASK_POOL (data); + GSource *source; + + source = g_idle_source_new (); + g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, pool, + NULL); + g_source_attach (source, pool->priv->schedule_context); + g_source_unref (source); + + g_main_loop_run (pool->priv->schedule_loop); + + return NULL; +} + +gboolean +gst_task_pool_need_schedule_thread (GstTaskPool * pool, gboolean needed) +{ + gboolean ret; + + g_return_val_if_fail (GST_IS_TASK_POOL (pool), FALSE); + g_return_val_if_fail (needed || pool->priv->need_schedule_thread > 0, FALSE); + + /* We don't allow this for the default pool */ + if (pool == gst_task_pool_get_default ()) { + gst_object_unref (pool); + return FALSE; + } + + g_mutex_lock (&pool->priv->schedule_lock); + if (needed) { + ret = TRUE; + if (pool->priv->need_schedule_thread == 0) { + pool->priv->schedule_context = g_main_context_new (); + pool->priv->schedule_loop = + g_main_loop_new (pool->priv->schedule_context, FALSE); + + pool->priv->schedule_thread = + g_thread_new (GST_OBJECT_NAME (pool), gst_task_pool_schedule_func, + pool); + + g_cond_wait (&pool->priv->schedule_cond, &pool->priv->schedule_lock); + } + pool->priv->need_schedule_thread++; + } else { + ret = FALSE; + pool->priv->need_schedule_thread--; + if (pool->priv->need_schedule_thread == 0) { + g_main_loop_quit (pool->priv->schedule_loop); + g_thread_join (pool->priv->schedule_thread); + g_main_loop_unref (pool->priv->schedule_loop); + pool->priv->schedule_loop = NULL; + g_main_context_unref (pool->priv->schedule_context); + pool->priv->schedule_context = NULL; + pool->priv->schedule_thread = NULL; + } + } + g_mutex_unlock (&pool->priv->schedule_lock); + + return ret; +} + +GMainContext * +gst_task_pool_get_schedule_context (GstTaskPool * pool) +{ + GMainContext *context; + + g_return_val_if_fail (GST_IS_TASK_POOL (pool), NULL); + g_return_val_if_fail (pool->priv->need_schedule_thread > 0, NULL); + + g_mutex_lock (&pool->priv->schedule_lock); + context = pool->priv->schedule_context; + if (context) + g_main_context_ref (context); + g_mutex_unlock (&pool->priv->schedule_lock); + + return context; +} + GstTaskPool * gst_task_pool_get_default (void) { diff --git a/gst/gsttaskpool.h b/gst/gsttaskpool.h index 82aa5baa3..859ec6fc1 100644 --- a/gst/gsttaskpool.h +++ b/gst/gsttaskpool.h @@ -37,6 +37,7 @@ G_BEGIN_DECLS typedef struct _GstTaskPool GstTaskPool; typedef struct _GstTaskPoolClass GstTaskPoolClass; +typedef struct _GstTaskPoolPrivate GstTaskPoolPrivate; /** * GstTaskPoolFunction: @@ -57,7 +58,8 @@ struct _GstTaskPool { /*< private >*/ GThreadPool *pool; - gpointer _gst_reserved[GST_PADDING]; + GstTaskPoolPrivate *priv; + gpointer _gst_reserved[GST_PADDING-1]; }; /** @@ -98,6 +100,9 @@ void gst_task_pool_cleanup (GstTaskPool *pool); GstTaskPool * gst_task_pool_get_default (void); +gboolean gst_task_pool_need_schedule_thread (GstTaskPool *pool, gboolean needed); +GMainContext * gst_task_pool_get_schedule_context (GstTaskPool *pool); + G_END_DECLS #endif /* __GST_TASK_POOL_H__ */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 02055cdbe..9d6dbde17 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -250,6 +250,31 @@ queue_leaky_get_type (void) static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; +static gboolean +gst_queue_post_message (GstElement * element, GstMessage * msg) +{ + GstQueue *queue = GST_QUEUE (element); + gboolean ret; + + gst_message_ref (msg); + ret = GST_ELEMENT_CLASS (parent_class)->post_message (element, msg); + + if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_STREAM_STATUS) { + GstStreamStatusType type; + + gst_message_parse_stream_status (msg, &type, NULL); + if (type == GST_STREAM_STATUS_TYPE_CREATE) { + queue->schedule_task = + gst_task_get_scheduleable (GST_PAD_TASK (queue->srcpad)); + GST_DEBUG_OBJECT (queue, "Scheduling task %d", queue->schedule_task); + } + } + + gst_message_unref (msg); + + return ret; +} + static void gst_queue_class_init (GstQueueClass * klass) { @@ -404,6 +429,8 @@ gst_queue_class_init (GstQueueClass * klass) gobject_class->finalize = gst_queue_finalize; + gstelement_class->post_message = gst_queue_post_message; + gst_element_class_set_static_metadata (gstelement_class, "Queue", "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>"); @@ -726,7 +753,10 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) qitem->is_query = FALSE; qitem->size = bsize; gst_queue_array_push_tail (queue->queue, qitem); - GST_QUEUE_SIGNAL_ADD (queue); + if (queue->schedule_task) + gst_task_schedule (GST_PAD_TASK (queue->srcpad)); + else + GST_QUEUE_SIGNAL_ADD (queue); } static gboolean @@ -760,7 +790,10 @@ gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item) qitem->is_query = FALSE; qitem->size = bsize; gst_queue_array_push_tail (queue->queue, qitem); - GST_QUEUE_SIGNAL_ADD (queue); + if (queue->schedule_task) + gst_task_schedule (GST_PAD_TASK (queue->srcpad)); + else + GST_QUEUE_SIGNAL_ADD (queue); } static inline void @@ -804,7 +837,10 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) qitem->item = item; qitem->is_query = FALSE; gst_queue_array_push_tail (queue->queue, qitem); - GST_QUEUE_SIGNAL_ADD (queue); + if (queue->schedule_task) + gst_task_schedule (GST_PAD_TASK (queue->srcpad)); + else + GST_QUEUE_SIGNAL_ADD (queue); } /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */ @@ -915,7 +951,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_FLUSHING; /* unblock the loop and chain functions */ - GST_QUEUE_SIGNAL_ADD (queue); + if (queue->schedule_task) + gst_task_schedule (GST_PAD_TASK (queue->srcpad)); + else + GST_QUEUE_SIGNAL_ADD (queue); GST_QUEUE_SIGNAL_DEL (queue); queue->last_query = FALSE; g_cond_signal (&queue->query_handled); @@ -1023,7 +1062,10 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) qitem->item = GST_MINI_OBJECT_CAST (query); qitem->is_query = TRUE; gst_queue_array_push_tail (queue->queue, qitem); - GST_QUEUE_SIGNAL_ADD (queue); + if (queue->schedule_task) + gst_task_schedule (GST_PAD_TASK (queue->srcpad)); + else + GST_QUEUE_SIGNAL_ADD (queue); g_cond_wait (&queue->query_handled, &queue->qlock); if (queue->srcresult != GST_FLOW_OK) goto out_flushing; @@ -1453,38 +1495,47 @@ gst_queue_loop (GstPad * pad) { GstQueue *queue; GstFlowReturn ret; + gboolean empty; queue = (GstQueue *) GST_PAD_PARENT (pad); /* have to lock for thread-safety */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - while (gst_queue_is_empty (queue)) { - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty"); - if (!queue->silent) { - GST_QUEUE_MUTEX_UNLOCK (queue); - g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - } + if (queue->schedule_task) { + empty = gst_queue_is_empty (queue); + if (empty) + gst_task_unschedule (GST_PAD_TASK (pad)); + } else { + while ((empty = gst_queue_is_empty (queue))) { + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty"); + if (!queue->silent) { + GST_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0); + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + } - /* we recheck, the signal could have changed the thresholds */ - while (gst_queue_is_empty (queue)) { - GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); - } + /* we recheck, the signal could have changed the thresholds */ + while ((empty = gst_queue_is_empty (queue))) { + GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); + } - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty"); - if (!queue->silent) { - GST_QUEUE_MUTEX_UNLOCK (queue); - g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0); - g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0); - GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty"); + if (!queue->silent) { + GST_QUEUE_MUTEX_UNLOCK (queue); + g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0); + g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0); + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + } } } - ret = gst_queue_push_one (queue); - queue->srcresult = ret; - if (ret != GST_FLOW_OK) - goto out_flushing; + if (!empty) { + ret = gst_queue_push_one (queue); + queue->srcresult = ret; + if (ret != GST_FLOW_OK) + goto out_flushing; + } GST_QUEUE_MUTEX_UNLOCK (queue); diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 513457803..e853f2dac 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -137,6 +137,9 @@ struct _GstQueue { gboolean last_query; gboolean flush_on_eos; /* flush on EOS */ + + /* TRUE if we schedule/unschedule tasks */ + gboolean schedule_task; }; struct _GstQueueClass { diff --git a/tests/examples/streams/.gitignore b/tests/examples/streams/.gitignore index d8cfe30d0..7a643dd3e 100644 --- a/tests/examples/streams/.gitignore +++ b/tests/examples/streams/.gitignore @@ -1,5 +1,6 @@ stream-status rtpool-test +schedule-tasks *.bb *.bbg *.da diff --git a/tests/examples/streams/Makefile.am b/tests/examples/streams/Makefile.am index e1f87acaa..22541c961 100644 --- a/tests/examples/streams/Makefile.am +++ b/tests/examples/streams/Makefile.am @@ -1,4 +1,4 @@ -noinst_PROGRAMS = stream-status +noinst_PROGRAMS = schedule-tasks stream-status if HAVE_PTHREAD noinst_PROGRAMS += rtpool-test endif @@ -11,4 +11,8 @@ rtpool_test_SOURCES = rtpool-test.c testrtpool.h testrtpool.c rtpool_test_LDADD = $(GST_OBJ_LIBS) $(PTHREAD_LIBS) rtpool_test_CFLAGS = $(GST_OBJ_CFLAGS) $(PTHREAD_CFLAGS) +schedule_tasks_SOURCES = schedule-tasks.c +schedule_tasks_LDADD = $(GST_OBJ_LIBS) +schedule_tasks_CFLAGS = $(GST_OBJ_CFLAGS) + EXTRA_DIST = rtpool-test.c testrtpool.h testrtpool.c diff --git a/tests/examples/streams/schedule-tasks.c b/tests/examples/streams/schedule-tasks.c new file mode 100644 index 000000000..bdc768991 --- /dev/null +++ b/tests/examples/streams/schedule-tasks.c @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2015 Sebastian Dröge <sebastian@centricular.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +#include <gst/gst.h> + +static GMainLoop *loop; +static GstElement *pipeline; +#define N_SINKS 10 +static GstElement *src, *tee, *queue[N_SINKS], *sink[N_SINKS]; + +static GstTaskPool *pool; + +static gboolean +message_cb (GstBus * bus, GstMessage * message, gpointer user_data) +{ + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR:{ + GError *err = NULL; + gchar *name, *debug = NULL; + + name = gst_object_get_path_string (message->src); + gst_message_parse_error (message, &err, &debug); + + g_printerr ("ERROR: from element %s: %s\n", name, err->message); + if (debug != NULL) + g_printerr ("Additional debug info:\n%s\n", debug); + + g_error_free (err); + g_free (debug); + g_free (name); + + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_WARNING:{ + GError *err = NULL; + gchar *name, *debug = NULL; + + name = gst_object_get_path_string (message->src); + gst_message_parse_warning (message, &err, &debug); + + g_printerr ("ERROR: from element %s: %s\n", name, err->message); + if (debug != NULL) + g_printerr ("Additional debug info:\n%s\n", debug); + + g_error_free (err); + g_free (debug); + g_free (name); + break; + } + case GST_MESSAGE_EOS: + g_print ("Got EOS\n"); + g_main_loop_quit (loop); + break; + default: + break; + } + + return TRUE; +} + +static void +stream_status_cb (GstBus * bus, GstMessage * msg, gpointer user_data) +{ + GstStreamStatusType type; + + gst_message_parse_stream_status (msg, &type, NULL); + if (type == GST_STREAM_STATUS_TYPE_CREATE) { + GstTask *task = GST_PAD_TASK (GST_MESSAGE_SRC (msg)); + gboolean ret; + + gst_task_set_pool (task, pool); + ret = gst_task_set_scheduleable (task, TRUE); + g_assert (ret); + } +} + +static void +handoff_cb (GstElement * sink, GstBuffer * buf, GstPad * pad, + gpointer user_data) +{ + g_print ("%s: handoff thread %p timestamp %" GST_TIME_FORMAT "\n", + GST_OBJECT_NAME (sink), g_thread_self (), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); +} + +int +main (int argc, char **argv) +{ + GstBus *bus; + gint i; + + gst_init (&argc, &argv); + + pool = gst_task_pool_new_full (2, FALSE); + gst_task_pool_prepare (pool, NULL); + + pipeline = gst_pipeline_new (NULL); + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "is-live", TRUE, "format", GST_FORMAT_TIME, "num-buffers", + 1000, NULL); + tee = gst_element_factory_make ("tee", NULL); + + gst_bin_add_many (GST_BIN (pipeline), src, tee, NULL); + gst_element_link_pads (src, "src", tee, "sink"); + + for (i = 0; i < N_SINKS; i++) { + GstPad *srcpad, *sinkpad; + + queue[i] = gst_element_factory_make ("queue", NULL); + sink[i] = gst_element_factory_make ("fakesink", NULL); + + gst_bin_add_many (GST_BIN (pipeline), queue[i], sink[i], NULL); + + g_object_set (sink[i], "async", FALSE, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink[i], "handoff", G_CALLBACK (handoff_cb), NULL); + + srcpad = gst_element_get_request_pad (tee, "src_%u"); + sinkpad = gst_element_get_static_pad (queue[i], "sink"); + gst_pad_link (srcpad, sinkpad); + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + + gst_element_link_pads (queue[i], "src", sink[i], "sink"); + } + + if (!pipeline || !src || !tee) { + g_error ("Failed to create elements"); + return -1; + } + + loop = g_main_loop_new (NULL, FALSE); + + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + gst_bus_add_signal_watch (bus); + g_signal_connect (G_OBJECT (bus), "message", G_CALLBACK (message_cb), NULL); + gst_bus_enable_sync_message_emission (bus); + g_signal_connect (G_OBJECT (bus), "sync-message::stream-status", + G_CALLBACK (stream_status_cb), NULL); + gst_object_unref (GST_OBJECT (bus)); + + if (gst_element_set_state (pipeline, + GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + g_error ("Failed to go into PLAYING state"); + return -3; + } + + g_main_loop_run (loop); + + gst_element_set_state (pipeline, GST_STATE_NULL); + + g_main_loop_unref (loop); + gst_object_unref (pipeline); + + gst_task_pool_cleanup (pool); + gst_object_unref (pool); + + return 0; +} |