summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2015-03-17 18:31:15 +0100
committerSebastian Dröge <sebastian@centricular.com>2015-03-18 17:21:45 +0100
commit5157cb91ae18d05de99badb23d76092aada645ec (patch)
treef62fb38324a5770ca64711e20af890246d1b9bca
parentf52f442bc2aa31ecb87b4d359a162834ea0c77eb (diff)
task: Implement scheduleable tasks
This allows us the have a custom thread pool with a limited number of threads used for a set of elements. E.g. it allows to run 10 queues with only 2 threads. Also implement support for handling that inside the queue element. And implement a simple test application using queues.
-rw-r--r--gst/gsttask.c142
-rw-r--r--gst/gsttask.h6
-rw-r--r--gst/gsttaskpool.c132
-rw-r--r--gst/gsttaskpool.h7
-rw-r--r--plugins/elements/gstqueue.c103
-rw-r--r--plugins/elements/gstqueue.h3
-rw-r--r--tests/examples/streams/.gitignore1
-rw-r--r--tests/examples/streams/Makefile.am6
-rw-r--r--tests/examples/streams/schedule-tasks.c179
9 files changed, 526 insertions, 53 deletions
diff --git a/gst/gsttask.c b/gst/gsttask.c
index b0b96e30c..68b46e138 100644
--- a/gst/gsttask.c
+++ b/gst/gsttask.c
@@ -1,6 +1,7 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2005 Wim Taymans <wim@fluendo.com>
+ * 2015 Sebastian Dröge <sebastian@centricular.com>
*
* gsttask.c: Streaming tasks
*
@@ -105,6 +106,23 @@ struct _GstTaskPrivate
/* configured pool */
GstTaskPool *pool;
+ /* If the creator of the task wants to schedule /
+ * unschedule the task instead of having it run
+ * all the time. Once this is TRUE, the pool can't
+ * be changed anymore!
+ *
+ * This can only be TRUE if the pool is not the
+ * default pool because we want explicit usage
+ * of this from the application by providing a
+ * custom task pool
+ */
+ gboolean scheduleable;
+ /* if the task should be scheduled again on the
+ * next opportunity. Defaults to TRUE until
+ * unschedule is called
+ */
+ gboolean should_schedule;
+
/* remember the pool and id that is currently running. */
gpointer id;
GstTaskPool *pool_id;
@@ -143,6 +161,7 @@ SetThreadName (DWORD dwThreadID, LPCSTR szThreadName)
static void gst_task_finalize (GObject * object);
static void gst_task_func (GstTask * task);
+static gboolean start_task (GstTask * task);
#define _do_init \
{ \
@@ -181,6 +200,9 @@ gst_task_init (GstTask * task)
task->priv->pool = gst_object_ref (klass->pool);
+ task->priv->scheduleable = FALSE;
+ task->priv->should_schedule = TRUE;
+
/* clear floating flag */
gst_object_ref_sink (task);
}
@@ -270,7 +292,8 @@ gst_task_func (GstTask * task)
* mark our state running so that nobody can mess with
* the mutex. */
GST_OBJECT_LOCK (task);
- if (GET_TASK_STATE (task) == GST_TASK_STOPPED)
+ if (GET_TASK_STATE (task) == GST_TASK_STOPPED ||
+ (priv->scheduleable && GET_TASK_STATE (task) == GST_TASK_PAUSED))
goto exit;
lock = GST_TASK_GET_LOCK (task);
if (G_UNLIKELY (lock == NULL))
@@ -279,7 +302,7 @@ gst_task_func (GstTask * task)
GST_OBJECT_UNLOCK (task);
/* fire the enter_func callback when we need to */
- if (priv->enter_func)
+ if (!priv->scheduleable && priv->enter_func)
priv->enter_func (task, tself, priv->enter_user_data);
/* locking order is TASK_LOCK, LOCK */
@@ -289,7 +312,15 @@ gst_task_func (GstTask * task)
while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
GST_OBJECT_LOCK (task);
- while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
+
+ if (G_UNLIKELY (priv->scheduleable
+ && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
+ GST_OBJECT_UNLOCK (task);
+ break;
+ }
+
+ while (G_UNLIKELY (!priv->scheduleable
+ && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
g_rec_mutex_unlock (lock);
GST_TASK_SIGNAL (task);
@@ -310,6 +341,9 @@ gst_task_func (GstTask * task)
}
task->func (task->user_data);
+
+ if (priv->scheduleable)
+ break;
}
g_rec_mutex_unlock (lock);
@@ -318,21 +352,27 @@ gst_task_func (GstTask * task)
task->thread = NULL;
exit:
- if (priv->leave_func) {
+ if (!priv->scheduleable && priv->leave_func) {
/* fire the leave_func callback when we need to. We need to do this before
* we signal the task and with the task lock released. */
GST_OBJECT_UNLOCK (task);
priv->leave_func (task, tself, priv->leave_user_data);
GST_OBJECT_LOCK (task);
}
- /* now we allow messing with the lock again by setting the running flag to
- * %FALSE. Together with the SIGNAL this is the sign for the _join() to
- * complete.
- * Note that we still have not dropped the final ref on the task. We could
- * check here if there is a pending join() going on and drop the last ref
- * before releasing the lock as we can be sure that a ref is held by the
- * caller of the join(). */
- task->running = FALSE;
+
+ if (priv->scheduleable && priv->should_schedule
+ && GET_TASK_STATE (task) == GST_TASK_STARTED) {
+ start_task (task);
+ } else {
+ /* now we allow messing with the lock again by setting the running flag to
+ * %FALSE. Together with the SIGNAL this is the sign for the _join() to
+ * complete.
+ * Note that we still have not dropped the final ref on the task. We could
+ * check here if there is a pending join() going on and drop the last ref
+ * before releasing the lock as we can be sure that a ref is held by the
+ * caller of the join(). */
+ task->running = FALSE;
+ }
GST_TASK_SIGNAL (task);
GST_OBJECT_UNLOCK (task);
@@ -481,7 +521,7 @@ gst_task_get_pool (GstTask * task)
void
gst_task_set_pool (GstTask * task, GstTaskPool * pool)
{
- GstTaskPool *old;
+ GstTaskPool *old = NULL;
GstTaskPrivate *priv;
g_return_if_fail (GST_IS_TASK (task));
@@ -491,10 +531,14 @@ gst_task_set_pool (GstTask * task, GstTaskPool * pool)
GST_OBJECT_LOCK (task);
if (priv->pool != pool) {
- old = priv->pool;
- priv->pool = gst_object_ref (pool);
- } else
- old = NULL;
+ if (priv->scheduleable) {
+ g_warning ("%s: Changing pool not possible for scheduleable tasks",
+ GST_OBJECT_NAME (task));
+ } else {
+ old = priv->pool;
+ priv->pool = gst_object_ref (pool);
+ }
+ }
GST_OBJECT_UNLOCK (task);
if (old)
@@ -610,6 +654,8 @@ start_task (GstTask * task)
priv = task->priv;
+ GST_DEBUG_OBJECT (task, "Starting task");
+
/* new task, We ref before so that it remains alive while
* the thread is running. */
gst_object_ref (task);
@@ -617,6 +663,8 @@ start_task (GstTask * task)
* and exit the task function. */
task->running = TRUE;
+ if (priv->pool_id)
+ gst_object_unref (priv->pool_id);
/* push on the thread pool, we remember the original pool because the user
* could change it later on and then we join to the wrong pool. */
priv->pool_id = gst_object_ref (priv->pool);
@@ -667,11 +715,14 @@ gst_task_set_state (GstTask * task, GstTaskState state)
old = GET_TASK_STATE (task);
if (old != state) {
SET_TASK_STATE (task, state);
+
switch (old) {
case GST_TASK_STOPPED:
/* If the task already has a thread scheduled we don't have to do
* anything. */
- if (G_UNLIKELY (!task->running))
+ if (G_UNLIKELY (!task->running) &&
+ (!task->priv->scheduleable || (task->priv->should_schedule
+ && state == GST_TASK_STARTED)))
res = start_task (task);
break;
case GST_TASK_PAUSED:
@@ -829,3 +880,58 @@ joining_self:
return FALSE;
}
}
+
+gboolean
+gst_task_get_scheduleable (GstTask * task)
+{
+ g_return_val_if_fail (GST_IS_TASK (task), FALSE);
+
+ return task->priv->scheduleable;
+}
+
+gboolean
+gst_task_set_scheduleable (GstTask * task, gboolean scheduleable)
+{
+ GstTaskClass *klass;
+
+ g_return_val_if_fail (GST_IS_TASK (task), FALSE);
+
+ klass = GST_TASK_GET_CLASS (task);
+ GST_OBJECT_LOCK (task);
+ if (task->priv->pool == klass->pool) {
+ GST_OBJECT_UNLOCK (task);
+ return FALSE;
+ }
+
+ task->priv->scheduleable = scheduleable;
+ GST_OBJECT_UNLOCK (task);
+
+ return scheduleable;
+}
+
+void
+gst_task_schedule (GstTask * task)
+{
+ g_return_if_fail (GST_IS_TASK (task));
+
+ GST_OBJECT_LOCK (task);
+ GST_DEBUG_OBJECT (task, "Scheduling task");
+ if (!task->running && task->priv->scheduleable
+ && GET_TASK_STATE (task) == GST_TASK_STARTED) {
+ GST_DEBUG_OBJECT (task, "Task needs to be scheduled");
+ task->priv->should_schedule = TRUE;
+ start_task (task);
+ }
+ GST_OBJECT_UNLOCK (task);
+}
+
+void
+gst_task_unschedule (GstTask * task)
+{
+ g_return_if_fail (GST_IS_TASK (task));
+
+ GST_OBJECT_LOCK (task);
+ GST_DEBUG_OBJECT (task, "Unscheduling task");
+ task->priv->should_schedule = FALSE;
+ GST_OBJECT_UNLOCK (task);
+}
diff --git a/gst/gsttask.h b/gst/gsttask.h
index 85117fff3..13ec924db 100644
--- a/gst/gsttask.h
+++ b/gst/gsttask.h
@@ -194,6 +194,12 @@ gboolean gst_task_pause (GstTask *task);
gboolean gst_task_join (GstTask *task);
+gboolean gst_task_get_scheduleable(GstTask *task);
+gboolean gst_task_set_scheduleable(GstTask *task, gboolean scheduleable);
+
+void gst_task_schedule (GstTask *task);
+void gst_task_unschedule (GstTask *task);
+
G_END_DECLS
#endif /* __GST_TASK_H__ */
diff --git a/gst/gsttaskpool.c b/gst/gsttaskpool.c
index cbbe1b7ef..347c2ec5e 100644
--- a/gst/gsttaskpool.c
+++ b/gst/gsttaskpool.c
@@ -1,5 +1,6 @@
/* GStreamer
* Copyright (C) 2009 Wim Taymans <wim.taymans@gmail.com>
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
*
* gsttaskpool.c: Pool for streaming threads
*
@@ -38,15 +39,26 @@
GST_DEBUG_CATEGORY_STATIC (taskpool_debug);
#define GST_CAT_DEFAULT (taskpool_debug)
-#ifndef GST_DISABLE_GST_DEBUG
+struct _GstTaskPoolPrivate
+{
+ gint need_schedule_thread;
+ GMainContext *schedule_context;
+ GMainLoop *schedule_loop;
+ GThread *schedule_thread;
+ GMutex schedule_lock;
+ GCond schedule_cond;
+};
+
static void gst_task_pool_finalize (GObject * object);
-#endif
#define _do_init \
{ \
GST_DEBUG_CATEGORY_INIT (taskpool_debug, "taskpool", 0, "Thread pool"); \
}
+#define GST_TASK_POOL_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK_POOL, GstTaskPoolPrivate))
+
G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
typedef struct
@@ -126,9 +138,9 @@ gst_task_pool_class_init (GstTaskPoolClass * klass)
gobject_class = (GObjectClass *) klass;
gsttaskpool_class = (GstTaskPoolClass *) klass;
-#ifndef GST_DISABLE_GST_DEBUG
+ g_type_class_add_private (gobject_class, sizeof (GstTaskPoolPrivate));
+
gobject_class->finalize = gst_task_pool_finalize;
-#endif
gsttaskpool_class->prepare = default_prepare;
gsttaskpool_class->cleanup = default_cleanup;
@@ -139,19 +151,32 @@ gst_task_pool_class_init (GstTaskPoolClass * klass)
static void
gst_task_pool_init (GstTaskPool * pool)
{
+ pool->priv = GST_TASK_POOL_GET_PRIVATE (pool);
+
+ pool->priv->need_schedule_thread = 0;
+ pool->priv->schedule_context = NULL;
+ pool->priv->schedule_loop = NULL;
+ pool->priv->schedule_thread = NULL;
+ g_mutex_init (&pool->priv->schedule_lock);
+ g_cond_init (&pool->priv->schedule_cond);
+
/* clear floating flag */
gst_object_ref_sink (pool);
}
-#ifndef GST_DISABLE_GST_DEBUG
static void
gst_task_pool_finalize (GObject * object)
{
- GST_DEBUG ("taskpool %p finalize", object);
+ GstTaskPool *pool = GST_TASK_POOL (object);
+
+ GST_DEBUG_OBJECT (pool, "taskpool finalize");
+
+ g_mutex_clear (&pool->priv->schedule_lock);
+ g_cond_clear (&pool->priv->schedule_cond);
G_OBJECT_CLASS (gst_task_pool_parent_class)->finalize (object);
}
-#endif
+
/**
* gst_task_pool_new:
*
@@ -271,6 +296,99 @@ gst_task_pool_join (GstTaskPool * pool, gpointer id)
klass->join (pool, id);
}
+static gboolean
+main_loop_running_cb (gpointer user_data)
+{
+ GstTaskPool *pool = GST_TASK_POOL (user_data);
+
+ g_mutex_lock (&pool->priv->schedule_lock);
+ g_cond_signal (&pool->priv->schedule_cond);
+ g_mutex_unlock (&pool->priv->schedule_lock);
+
+ return G_SOURCE_REMOVE;
+}
+
+static gpointer
+gst_task_pool_schedule_func (gpointer data)
+{
+ GstTaskPool *pool = GST_TASK_POOL (data);
+ GSource *source;
+
+ source = g_idle_source_new ();
+ g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, pool,
+ NULL);
+ g_source_attach (source, pool->priv->schedule_context);
+ g_source_unref (source);
+
+ g_main_loop_run (pool->priv->schedule_loop);
+
+ return NULL;
+}
+
+gboolean
+gst_task_pool_need_schedule_thread (GstTaskPool * pool, gboolean needed)
+{
+ gboolean ret;
+
+ g_return_val_if_fail (GST_IS_TASK_POOL (pool), FALSE);
+ g_return_val_if_fail (needed || pool->priv->need_schedule_thread > 0, FALSE);
+
+ /* We don't allow this for the default pool */
+ if (pool == gst_task_pool_get_default ()) {
+ gst_object_unref (pool);
+ return FALSE;
+ }
+
+ g_mutex_lock (&pool->priv->schedule_lock);
+ if (needed) {
+ ret = TRUE;
+ if (pool->priv->need_schedule_thread == 0) {
+ pool->priv->schedule_context = g_main_context_new ();
+ pool->priv->schedule_loop =
+ g_main_loop_new (pool->priv->schedule_context, FALSE);
+
+ pool->priv->schedule_thread =
+ g_thread_new (GST_OBJECT_NAME (pool), gst_task_pool_schedule_func,
+ pool);
+
+ g_cond_wait (&pool->priv->schedule_cond, &pool->priv->schedule_lock);
+ }
+ pool->priv->need_schedule_thread++;
+ } else {
+ ret = FALSE;
+ pool->priv->need_schedule_thread--;
+ if (pool->priv->need_schedule_thread == 0) {
+ g_main_loop_quit (pool->priv->schedule_loop);
+ g_thread_join (pool->priv->schedule_thread);
+ g_main_loop_unref (pool->priv->schedule_loop);
+ pool->priv->schedule_loop = NULL;
+ g_main_context_unref (pool->priv->schedule_context);
+ pool->priv->schedule_context = NULL;
+ pool->priv->schedule_thread = NULL;
+ }
+ }
+ g_mutex_unlock (&pool->priv->schedule_lock);
+
+ return ret;
+}
+
+GMainContext *
+gst_task_pool_get_schedule_context (GstTaskPool * pool)
+{
+ GMainContext *context;
+
+ g_return_val_if_fail (GST_IS_TASK_POOL (pool), NULL);
+ g_return_val_if_fail (pool->priv->need_schedule_thread > 0, NULL);
+
+ g_mutex_lock (&pool->priv->schedule_lock);
+ context = pool->priv->schedule_context;
+ if (context)
+ g_main_context_ref (context);
+ g_mutex_unlock (&pool->priv->schedule_lock);
+
+ return context;
+}
+
GstTaskPool *
gst_task_pool_get_default (void)
{
diff --git a/gst/gsttaskpool.h b/gst/gsttaskpool.h
index 82aa5baa3..859ec6fc1 100644
--- a/gst/gsttaskpool.h
+++ b/gst/gsttaskpool.h
@@ -37,6 +37,7 @@ G_BEGIN_DECLS
typedef struct _GstTaskPool GstTaskPool;
typedef struct _GstTaskPoolClass GstTaskPoolClass;
+typedef struct _GstTaskPoolPrivate GstTaskPoolPrivate;
/**
* GstTaskPoolFunction:
@@ -57,7 +58,8 @@ struct _GstTaskPool {
/*< private >*/
GThreadPool *pool;
- gpointer _gst_reserved[GST_PADDING];
+ GstTaskPoolPrivate *priv;
+ gpointer _gst_reserved[GST_PADDING-1];
};
/**
@@ -98,6 +100,9 @@ void gst_task_pool_cleanup (GstTaskPool *pool);
GstTaskPool * gst_task_pool_get_default (void);
+gboolean gst_task_pool_need_schedule_thread (GstTaskPool *pool, gboolean needed);
+GMainContext * gst_task_pool_get_schedule_context (GstTaskPool *pool);
+
G_END_DECLS
#endif /* __GST_TASK_POOL_H__ */
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index 02055cdbe..9d6dbde17 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -250,6 +250,31 @@ queue_leaky_get_type (void)
static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
+static gboolean
+gst_queue_post_message (GstElement * element, GstMessage * msg)
+{
+ GstQueue *queue = GST_QUEUE (element);
+ gboolean ret;
+
+ gst_message_ref (msg);
+ ret = GST_ELEMENT_CLASS (parent_class)->post_message (element, msg);
+
+ if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_STREAM_STATUS) {
+ GstStreamStatusType type;
+
+ gst_message_parse_stream_status (msg, &type, NULL);
+ if (type == GST_STREAM_STATUS_TYPE_CREATE) {
+ queue->schedule_task =
+ gst_task_get_scheduleable (GST_PAD_TASK (queue->srcpad));
+ GST_DEBUG_OBJECT (queue, "Scheduling task %d", queue->schedule_task);
+ }
+ }
+
+ gst_message_unref (msg);
+
+ return ret;
+}
+
static void
gst_queue_class_init (GstQueueClass * klass)
{
@@ -404,6 +429,8 @@ gst_queue_class_init (GstQueueClass * klass)
gobject_class->finalize = gst_queue_finalize;
+ gstelement_class->post_message = gst_queue_post_message;
+
gst_element_class_set_static_metadata (gstelement_class,
"Queue",
"Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
@@ -726,7 +753,10 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
qitem->is_query = FALSE;
qitem->size = bsize;
gst_queue_array_push_tail (queue->queue, qitem);
- GST_QUEUE_SIGNAL_ADD (queue);
+ if (queue->schedule_task)
+ gst_task_schedule (GST_PAD_TASK (queue->srcpad));
+ else
+ GST_QUEUE_SIGNAL_ADD (queue);
}
static gboolean
@@ -760,7 +790,10 @@ gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
qitem->is_query = FALSE;
qitem->size = bsize;
gst_queue_array_push_tail (queue->queue, qitem);
- GST_QUEUE_SIGNAL_ADD (queue);
+ if (queue->schedule_task)
+ gst_task_schedule (GST_PAD_TASK (queue->srcpad));
+ else
+ GST_QUEUE_SIGNAL_ADD (queue);
}
static inline void
@@ -804,7 +837,10 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
qitem->item = item;
qitem->is_query = FALSE;
gst_queue_array_push_tail (queue->queue, qitem);
- GST_QUEUE_SIGNAL_ADD (queue);
+ if (queue->schedule_task)
+ gst_task_schedule (GST_PAD_TASK (queue->srcpad));
+ else
+ GST_QUEUE_SIGNAL_ADD (queue);
}
/* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
@@ -915,7 +951,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_FLUSHING;
/* unblock the loop and chain functions */
- GST_QUEUE_SIGNAL_ADD (queue);
+ if (queue->schedule_task)
+ gst_task_schedule (GST_PAD_TASK (queue->srcpad));
+ else
+ GST_QUEUE_SIGNAL_ADD (queue);
GST_QUEUE_SIGNAL_DEL (queue);
queue->last_query = FALSE;
g_cond_signal (&queue->query_handled);
@@ -1023,7 +1062,10 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
qitem->item = GST_MINI_OBJECT_CAST (query);
qitem->is_query = TRUE;
gst_queue_array_push_tail (queue->queue, qitem);
- GST_QUEUE_SIGNAL_ADD (queue);
+ if (queue->schedule_task)
+ gst_task_schedule (GST_PAD_TASK (queue->srcpad));
+ else
+ GST_QUEUE_SIGNAL_ADD (queue);
g_cond_wait (&queue->query_handled, &queue->qlock);
if (queue->srcresult != GST_FLOW_OK)
goto out_flushing;
@@ -1453,38 +1495,47 @@ gst_queue_loop (GstPad * pad)
{
GstQueue *queue;
GstFlowReturn ret;
+ gboolean empty;
queue = (GstQueue *) GST_PAD_PARENT (pad);
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
- while (gst_queue_is_empty (queue)) {
- GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
- if (!queue->silent) {
- GST_QUEUE_MUTEX_UNLOCK (queue);
- g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
- }
+ if (queue->schedule_task) {
+ empty = gst_queue_is_empty (queue);
+ if (empty)
+ gst_task_unschedule (GST_PAD_TASK (pad));
+ } else {
+ while ((empty = gst_queue_is_empty (queue))) {
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
+ if (!queue->silent) {
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ }
- /* we recheck, the signal could have changed the thresholds */
- while (gst_queue_is_empty (queue)) {
- GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
- }
+ /* we recheck, the signal could have changed the thresholds */
+ while ((empty = gst_queue_is_empty (queue))) {
+ GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ }
- GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
- if (!queue->silent) {
- GST_QUEUE_MUTEX_UNLOCK (queue);
- g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
- g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
+ if (!queue->silent) {
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ }
}
}
- ret = gst_queue_push_one (queue);
- queue->srcresult = ret;
- if (ret != GST_FLOW_OK)
- goto out_flushing;
+ if (!empty) {
+ ret = gst_queue_push_one (queue);
+ queue->srcresult = ret;
+ if (ret != GST_FLOW_OK)
+ goto out_flushing;
+ }
GST_QUEUE_MUTEX_UNLOCK (queue);
diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h
index 513457803..e853f2dac 100644
--- a/plugins/elements/gstqueue.h
+++ b/plugins/elements/gstqueue.h
@@ -137,6 +137,9 @@ struct _GstQueue {
gboolean last_query;
gboolean flush_on_eos; /* flush on EOS */
+
+ /* TRUE if we schedule/unschedule tasks */
+ gboolean schedule_task;
};
struct _GstQueueClass {
diff --git a/tests/examples/streams/.gitignore b/tests/examples/streams/.gitignore
index d8cfe30d0..7a643dd3e 100644
--- a/tests/examples/streams/.gitignore
+++ b/tests/examples/streams/.gitignore
@@ -1,5 +1,6 @@
stream-status
rtpool-test
+schedule-tasks
*.bb
*.bbg
*.da
diff --git a/tests/examples/streams/Makefile.am b/tests/examples/streams/Makefile.am
index e1f87acaa..22541c961 100644
--- a/tests/examples/streams/Makefile.am
+++ b/tests/examples/streams/Makefile.am
@@ -1,4 +1,4 @@
-noinst_PROGRAMS = stream-status
+noinst_PROGRAMS = schedule-tasks stream-status
if HAVE_PTHREAD
noinst_PROGRAMS += rtpool-test
endif
@@ -11,4 +11,8 @@ rtpool_test_SOURCES = rtpool-test.c testrtpool.h testrtpool.c
rtpool_test_LDADD = $(GST_OBJ_LIBS) $(PTHREAD_LIBS)
rtpool_test_CFLAGS = $(GST_OBJ_CFLAGS) $(PTHREAD_CFLAGS)
+schedule_tasks_SOURCES = schedule-tasks.c
+schedule_tasks_LDADD = $(GST_OBJ_LIBS)
+schedule_tasks_CFLAGS = $(GST_OBJ_CFLAGS)
+
EXTRA_DIST = rtpool-test.c testrtpool.h testrtpool.c
diff --git a/tests/examples/streams/schedule-tasks.c b/tests/examples/streams/schedule-tasks.c
new file mode 100644
index 000000000..bdc768991
--- /dev/null
+++ b/tests/examples/streams/schedule-tasks.c
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ */
+
+#include <gst/gst.h>
+
+static GMainLoop *loop;
+static GstElement *pipeline;
+#define N_SINKS 10
+static GstElement *src, *tee, *queue[N_SINKS], *sink[N_SINKS];
+
+static GstTaskPool *pool;
+
+static gboolean
+message_cb (GstBus * bus, GstMessage * message, gpointer user_data)
+{
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_ERROR:{
+ GError *err = NULL;
+ gchar *name, *debug = NULL;
+
+ name = gst_object_get_path_string (message->src);
+ gst_message_parse_error (message, &err, &debug);
+
+ g_printerr ("ERROR: from element %s: %s\n", name, err->message);
+ if (debug != NULL)
+ g_printerr ("Additional debug info:\n%s\n", debug);
+
+ g_error_free (err);
+ g_free (debug);
+ g_free (name);
+
+ g_main_loop_quit (loop);
+ break;
+ }
+ case GST_MESSAGE_WARNING:{
+ GError *err = NULL;
+ gchar *name, *debug = NULL;
+
+ name = gst_object_get_path_string (message->src);
+ gst_message_parse_warning (message, &err, &debug);
+
+ g_printerr ("ERROR: from element %s: %s\n", name, err->message);
+ if (debug != NULL)
+ g_printerr ("Additional debug info:\n%s\n", debug);
+
+ g_error_free (err);
+ g_free (debug);
+ g_free (name);
+ break;
+ }
+ case GST_MESSAGE_EOS:
+ g_print ("Got EOS\n");
+ g_main_loop_quit (loop);
+ break;
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+static void
+stream_status_cb (GstBus * bus, GstMessage * msg, gpointer user_data)
+{
+ GstStreamStatusType type;
+
+ gst_message_parse_stream_status (msg, &type, NULL);
+ if (type == GST_STREAM_STATUS_TYPE_CREATE) {
+ GstTask *task = GST_PAD_TASK (GST_MESSAGE_SRC (msg));
+ gboolean ret;
+
+ gst_task_set_pool (task, pool);
+ ret = gst_task_set_scheduleable (task, TRUE);
+ g_assert (ret);
+ }
+}
+
+static void
+handoff_cb (GstElement * sink, GstBuffer * buf, GstPad * pad,
+ gpointer user_data)
+{
+ g_print ("%s: handoff thread %p timestamp %" GST_TIME_FORMAT "\n",
+ GST_OBJECT_NAME (sink), g_thread_self (),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+}
+
+int
+main (int argc, char **argv)
+{
+ GstBus *bus;
+ gint i;
+
+ gst_init (&argc, &argv);
+
+ pool = gst_task_pool_new_full (2, FALSE);
+ gst_task_pool_prepare (pool, NULL);
+
+ pipeline = gst_pipeline_new (NULL);
+ src = gst_element_factory_make ("fakesrc", NULL);
+ g_object_set (src, "is-live", TRUE, "format", GST_FORMAT_TIME, "num-buffers",
+ 1000, NULL);
+ tee = gst_element_factory_make ("tee", NULL);
+
+ gst_bin_add_many (GST_BIN (pipeline), src, tee, NULL);
+ gst_element_link_pads (src, "src", tee, "sink");
+
+ for (i = 0; i < N_SINKS; i++) {
+ GstPad *srcpad, *sinkpad;
+
+ queue[i] = gst_element_factory_make ("queue", NULL);
+ sink[i] = gst_element_factory_make ("fakesink", NULL);
+
+ gst_bin_add_many (GST_BIN (pipeline), queue[i], sink[i], NULL);
+
+ g_object_set (sink[i], "async", FALSE, "signal-handoffs", TRUE, NULL);
+ g_signal_connect (sink[i], "handoff", G_CALLBACK (handoff_cb), NULL);
+
+ srcpad = gst_element_get_request_pad (tee, "src_%u");
+ sinkpad = gst_element_get_static_pad (queue[i], "sink");
+ gst_pad_link (srcpad, sinkpad);
+ gst_object_unref (srcpad);
+ gst_object_unref (sinkpad);
+
+ gst_element_link_pads (queue[i], "src", sink[i], "sink");
+ }
+
+ if (!pipeline || !src || !tee) {
+ g_error ("Failed to create elements");
+ return -1;
+ }
+
+ loop = g_main_loop_new (NULL, FALSE);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ gst_bus_add_signal_watch (bus);
+ g_signal_connect (G_OBJECT (bus), "message", G_CALLBACK (message_cb), NULL);
+ gst_bus_enable_sync_message_emission (bus);
+ g_signal_connect (G_OBJECT (bus), "sync-message::stream-status",
+ G_CALLBACK (stream_status_cb), NULL);
+ gst_object_unref (GST_OBJECT (bus));
+
+ if (gst_element_set_state (pipeline,
+ GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+ g_error ("Failed to go into PLAYING state");
+ return -3;
+ }
+
+ g_main_loop_run (loop);
+
+ gst_element_set_state (pipeline, GST_STATE_NULL);
+
+ g_main_loop_unref (loop);
+ gst_object_unref (pipeline);
+
+ gst_task_pool_cleanup (pool);
+ gst_object_unref (pool);
+
+ return 0;
+}