summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2013-11-20 11:57:48 -0800
committerSebastian Dröge <sebastian@centricular.com>2014-01-06 19:16:06 +0100
commit5d196befea2d647833164657adc579e19989dcdf (patch)
tree679615cb269ab229546472e3f963255ab0297d0d
parent1de533735b93ec8cbad346267f99e6519d0e3cc8 (diff)
WIP: collectpads: Refactor threading and add a separate streaming thread for the output sidecollectpads
This will allow us to properly support live streams later and separate flushing of a subset of sinkpad streams.
-rw-r--r--libs/gst/base/gstcollectpads.c198
1 files changed, 96 insertions, 102 deletions
diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c
index c82b0d46f..d11ffd22e 100644
--- a/libs/gst/base/gstcollectpads.c
+++ b/libs/gst/base/gstcollectpads.c
@@ -96,12 +96,20 @@ struct _GstCollectDataPrivate
/* refcounting for struct, and destroy callback */
GstCollectDataDestroyNotify destroy_notify;
gint refcount;
+
+ gsize size;
+
+ GMutex pad_lock;
+ GCond pad_cond;
};
struct _GstCollectPadsPrivate
{
/* with LOCK and/or STREAM_LOCK */
gboolean started;
+ GThread *thread;
+ GMutex loop_lock;
+ GCond loop_cond;
/* with STREAM_LOCK */
guint32 cookie; /* @data list cookie */
@@ -130,11 +138,6 @@ struct _GstCollectPadsPrivate
GstCollectPadsFlushFunction flush_func;
gpointer flush_user_data;
- /* no other lock needed */
- GMutex evt_lock; /* these make up sort of poor man's event signaling */
- GCond evt_cond;
- guint32 evt_cookie;
-
gboolean seeking;
gboolean pending_flush_start;
gboolean pending_flush_stop;
@@ -163,50 +166,7 @@ static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
pads, GstCollectData * data, GstQuery * query, gpointer user_data);
-
-/* Some properties are protected by LOCK, others by STREAM_LOCK
- * However, manipulating either of these partitions may require
- * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
- * Alternative implementations are possible, e.g. some low-level re-implementing
- * of the 2 above locks to drop both of them atomically when going into _WAIT.
- */
-#define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
-#define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
-#define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START { \
- g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
- /* should work unless a lot of event'ing and thread starvation */\
- while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
- g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \
- GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
- cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
- g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
-} G_STMT_END
-#define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
- GTimeVal __tv; \
- \
- g_get_current_time (&tv); \
- g_time_val_add (&tv, timeout); \
- \
- g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
- /* should work unless a lot of event'ing and thread starvation */\
- while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
- g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \
- GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv); \
- cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
- g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
-} G_STMT_END
-#define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START { \
- g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
- /* never mind wrap-around */ \
- ++(((GstCollectPads *) pads)->priv->evt_cookie); \
- g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads)); \
- g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
-} G_STMT_END
-#define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START { \
- g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
- cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
- g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
-} G_STMT_END
+static gpointer gst_collect_pads_loop (gpointer data);
static void
gst_collect_pads_class_init (GstCollectPadsClass * klass)
@@ -257,10 +217,8 @@ gst_collect_pads_init (GstCollectPads * pads)
pads->priv->pad_cookie = 0;
pads->priv->pad_list = NULL;
- /* members for event */
- g_mutex_init (&pads->priv->evt_lock);
- g_cond_init (&pads->priv->evt_cond);
- pads->priv->evt_cookie = 0;
+ g_mutex_init (&pads->priv->loop_lock);
+ g_cond_init (&pads->priv->loop_cond);
pads->priv->seeking = FALSE;
pads->priv->pending_flush_start = FALSE;
@@ -276,8 +234,8 @@ gst_collect_pads_finalize (GObject * object)
g_rec_mutex_clear (&pads->stream_lock);
- g_cond_clear (&pads->priv->evt_cond);
- g_mutex_clear (&pads->priv->evt_lock);
+ g_cond_clear (&pads->priv->loop_cond);
+ g_mutex_clear (&pads->priv->loop_lock);
/* Remove pads and free pads list */
g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
@@ -411,6 +369,8 @@ ref_data (GstCollectData * data)
static void
unref_data (GstCollectData * data)
{
+ gsize size;
+
g_assert (data != NULL);
g_assert (data->priv->refcount > 0);
@@ -424,8 +384,14 @@ unref_data (GstCollectData * data)
if (data->buffer) {
gst_buffer_unref (data->buffer);
}
- g_free (data->priv);
- g_free (data);
+
+ g_cond_clear (&data->priv->pad_cond);
+ g_mutex_clear (&data->priv->pad_lock);
+
+ size = data->priv->size;
+
+ g_slice_free (GstCollectPadsPrivate, data->priv);
+ g_slice_free1 (size, data);
}
/**
@@ -621,8 +587,9 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
- data = g_malloc0 (size);
- data->priv = g_new0 (GstCollectDataPrivate, 1);
+ data = g_slice_alloc0 (size);
+ data->priv = g_slice_new0 (GstCollectDataPrivate);
+ data->priv->size = size;
data->collect = pads;
data->pad = gst_object_ref (pad);
data->buffer = NULL;
@@ -632,6 +599,8 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
data->priv->refcount = 1;
data->priv->destroy_notify = destroy_notify;
+ g_mutex_init (&data->priv->pad_lock);
+ g_cond_init (&data->priv->pad_cond);
GST_OBJECT_LOCK (pads);
GST_OBJECT_LOCK (pad);
@@ -730,8 +699,10 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
pads->priv->pad_cookie++;
- /* signal waiters because something changed */
- GST_COLLECT_PADS_EVT_BROADCAST (pads);
+ /* signal loop because something changed */
+ g_mutex_lock (&pads->priv->loop_lock);
+ g_cond_signal (&pads->priv->loop_cond);
+ g_mutex_unlock (&pads->priv->loop_lock);
/* deactivate the pad when needed */
if (!pads->priv->started)
@@ -779,10 +750,11 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
gst_collect_pads_clear (pads, cdata);
GST_OBJECT_UNLOCK (cdata->pad);
}
+ /* inform _chain of changes */
+ g_mutex_lock (&pad->priv->pad_lock);
+ g_cond_signal (&pad->priv->pad_cond);
+ g_mutex_unlock (&pad->priv->pad_lock);
}
-
- /* inform _chain of changes */
- GST_COLLECT_PADS_EVT_BROADCAST (pads);
}
/**
@@ -848,6 +820,8 @@ gst_collect_pads_start (GstCollectPads * pads)
gst_collect_pads_set_flushing_unlocked (pads, FALSE);
/* Start collect pads */
+ pads->priv->thread =
+ g_thread_new ("collectpads-src", gst_collect_pads_loop, pads);
pads->priv->started = TRUE;
GST_OBJECT_UNLOCK (pads);
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
@@ -884,6 +858,15 @@ gst_collect_pads_stop (GstCollectPads * pads)
pads->priv->eospads = 0;
pads->priv->queuedpads = 0;
+ /* Stop the loop function */
+ g_mutex_lock (pads->priv->loop_lock);
+ g_cond_signal (pads->priv->loop_cond);
+ g_mutex_unlock (pads->priv->loop_lock);
+
+ g_thread_join (pads->priv->thread);
+ g_thread_unref (pads->priv->thread);
+ pads->priv->thread = NULL;
+
/* loop over the master pad list and flush buffers */
collected = pads->priv->pad_list;
for (; collected; collected = g_slist_next (collected)) {
@@ -897,6 +880,10 @@ gst_collect_pads_stop (GstCollectPads * pads)
data->pos = 0;
}
GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
+
+ g_mutex_lock (&data->priv->pad_lock);
+ g_cond_signal (&data->priv->pad_cond);
+ g_mutex_unlock (&data->priv->pad_lock);
}
if (pads->priv->earliest_data)
@@ -905,8 +892,6 @@ gst_collect_pads_stop (GstCollectPads * pads)
pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
GST_OBJECT_UNLOCK (pads);
- /* Wake them up so they can end the chain functions. */
- GST_COLLECT_PADS_EVT_BROADCAST (pads);
GST_COLLECT_PADS_STREAM_UNLOCK (pads);
}
@@ -966,15 +951,19 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
g_return_val_if_fail (data != NULL, NULL);
+ g_mutex_lock (&data->priv->pad_lock);
if ((result = data->buffer)) {
data->buffer = NULL;
data->pos = 0;
/* one less pad with queued data now */
+ g_mutex_lock (&pads->priv->loop_lock);
if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
pads->priv->queuedpads--;
+ g_mutex_unlock (&pads->priv->loop_lock);
}
- GST_COLLECT_PADS_EVT_BROADCAST (pads);
+ g_mutex_signal (&data->priv->pad_cond);
+ g_mutex_unlock (&data->priv->pad_lock);
GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
GST_DEBUG_PAD_NAME (data->pad), result);
@@ -1208,6 +1197,7 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
(GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
! !waiting)) {
+ g_mutex_lock (&pads->priv->loop_lock);
/* Set waiting state for this pad */
if (waiting)
GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
@@ -1222,8 +1212,9 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
pads->priv->queuedpads++;
}
- /* signal waiters because something changed */
- GST_COLLECT_PADS_EVT_BROADCAST (pads);
+ /* signal loop because something changed */
+ g_cond_signal (&pads->priv->loop_cond);
+ g_mutex_unlock (&pads->priv->loop_lock);
}
}
@@ -2105,7 +2096,6 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstCollectPads *pads;
GstFlowReturn ret;
GstBuffer **buffer_p;
- guint32 cookie;
GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
@@ -2119,8 +2109,10 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
pads = data->collect;
- GST_COLLECT_PADS_STREAM_LOCK (pads);
+ g_mutex_lock (&pads->priv->pad_lock);
+
/* if not started, bail out */
+ g_mutex_lock (&pads->priv->loop_lock);
if (G_UNLIKELY (!pads->priv->started))
goto not_started;
/* check if this pad is flushing */
@@ -2153,8 +2145,11 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_DEBUG_PAD_NAME (pad));
/* One more pad has data queued */
- if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
+ if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
+ g_mutex_lock (&pads->priv->loop_lock);
pads->priv->queuedpads++;
+ g_mutex_unlock (&pads->priv->loop_lock);
+ }
buffer_p = &data->buffer;
gst_buffer_replace (buffer_p, buffer);
@@ -2172,25 +2167,14 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* While we have data queued on this pad try to collect stuff */
do {
- /* Check if our collected condition is matched and call the collected
- * function if it is */
- ret = gst_collect_pads_check_collected (pads);
- /* when an error occurs, we want to report this back to the caller ASAP
- * without having to block if the buffer was not popped */
- if (G_UNLIKELY (ret != GST_FLOW_OK))
- goto error;
+ g_mutex_lock (&pads->priv->loop_lock);
+ g_cond_signal (&pads->priv->loop_cond);
+ g_mutex_unlock (&pads->priv->loop_lock);
- /* data was consumed, we can exit and accept new data */
- if (data->buffer == NULL)
- break;
+ GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
+ GST_DEBUG_PAD_NAME (pad));
- /* Having the _INIT here means we don't care about any broadcast up to here
- * (most of which occur with STREAM_LOCK held, so could not have happened
- * anyway). We do care about e.g. a remove initiated broadcast as of this
- * point. Putting it here also makes this thread ignores any evt it raised
- * itself (as is a usual WAIT semantic).
- */
- GST_COLLECT_PADS_EVT_INIT (cookie);
+ g_cond_wait (&pads->priv->pad_cond, &pads->priv->pad_lock);
/* pad could be removed and re-added */
unref_data (data);
@@ -2200,21 +2184,10 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
ref_data (data);
GST_OBJECT_UNLOCK (pad);
- GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
- GST_DEBUG_PAD_NAME (pad));
-
- /* wait to be collected, this must happen from another thread triggered
- * by the _chain function of another pad. We release the lock so we
- * can get stopped or flushed as well. We can however not get EOS
- * because we still hold the STREAM_LOCK.
- */
- GST_COLLECT_PADS_STREAM_UNLOCK (pads);
- GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
- GST_COLLECT_PADS_STREAM_LOCK (pads);
-
GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
/* after a signal, we could be stopped */
+ g_mutex_lock (&pads->priv->loop_lock);
if (G_UNLIKELY (!pads->priv->started))
goto not_started;
/* check if this pad is flushing */
@@ -2225,7 +2198,7 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
while (data->buffer != NULL);
unlock_done:
- GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+ g_mutex_unlock (&pads->priv->pad_lock);
/* data is definitely NULL if pad_removed goto was run. */
if (data)
unref_data (data);
@@ -2250,6 +2223,7 @@ no_data:
}
not_started:
{
+ g_mutex_unlock (&pads->priv->loop_lock);
GST_DEBUG ("not started");
gst_collect_pads_clear (pads, data);
ret = GST_FLOW_FLUSHING;
@@ -2285,3 +2259,23 @@ error:
goto unlock_done;
}
}
+
+static gpointer
+gst_collect_pads_loop (gpointer data)
+{
+ GstCollectPads *pads = data;
+
+#if 0
+ /* Check if our collected condition is matched and call the collected
+ * function if it is */
+ ret = gst_collect_pads_check_collected (pads);
+ /* when an error occurs, we want to report this back to the caller ASAP
+ * without having to block if the buffer was not popped */
+ if (G_UNLIKELY (ret != GST_FLOW_OK))
+ goto error;
+#endif
+
+ while (TRUE);
+
+ return NULL;
+}