diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2013-11-20 11:57:48 -0800 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2014-01-06 19:16:06 +0100 |
commit | 5d196befea2d647833164657adc579e19989dcdf (patch) | |
tree | 679615cb269ab229546472e3f963255ab0297d0d | |
parent | 1de533735b93ec8cbad346267f99e6519d0e3cc8 (diff) |
WIP: collectpads: Refactor threading and add a separate streaming thread for the output sidecollectpads
This will allow us to properly support live streams later and separate
flushing of a subset of sinkpad streams.
-rw-r--r-- | libs/gst/base/gstcollectpads.c | 198 |
1 files changed, 96 insertions, 102 deletions
diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c index c82b0d46f..d11ffd22e 100644 --- a/libs/gst/base/gstcollectpads.c +++ b/libs/gst/base/gstcollectpads.c @@ -96,12 +96,20 @@ struct _GstCollectDataPrivate /* refcounting for struct, and destroy callback */ GstCollectDataDestroyNotify destroy_notify; gint refcount; + + gsize size; + + GMutex pad_lock; + GCond pad_cond; }; struct _GstCollectPadsPrivate { /* with LOCK and/or STREAM_LOCK */ gboolean started; + GThread *thread; + GMutex loop_lock; + GCond loop_cond; /* with STREAM_LOCK */ guint32 cookie; /* @data list cookie */ @@ -130,11 +138,6 @@ struct _GstCollectPadsPrivate GstCollectPadsFlushFunction flush_func; gpointer flush_user_data; - /* no other lock needed */ - GMutex evt_lock; /* these make up sort of poor man's event signaling */ - GCond evt_cond; - guint32 evt_cookie; - gboolean seeking; gboolean pending_flush_start; gboolean pending_flush_stop; @@ -163,50 +166,7 @@ static gboolean gst_collect_pads_event_default_internal (GstCollectPads * static gboolean gst_collect_pads_query_default_internal (GstCollectPads * pads, GstCollectData * data, GstQuery * query, gpointer user_data); - -/* Some properties are protected by LOCK, others by STREAM_LOCK - * However, manipulating either of these partitions may require - * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races - * Alternative implementations are possible, e.g. some low-level re-implementing - * of the 2 above locks to drop both of them atomically when going into _WAIT. - */ -#define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond) -#define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock) -#define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START { \ - g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ - /* should work unless a lot of event'ing and thread starvation */\ - while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \ - g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \ - GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ - cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \ - g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ -} G_STMT_END -#define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \ - GTimeVal __tv; \ - \ - g_get_current_time (&tv); \ - g_time_val_add (&tv, timeout); \ - \ - g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ - /* should work unless a lot of event'ing and thread starvation */\ - while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \ - g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \ - GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv); \ - cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \ - g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ -} G_STMT_END -#define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START { \ - g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ - /* never mind wrap-around */ \ - ++(((GstCollectPads *) pads)->priv->evt_cookie); \ - g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads)); \ - g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ -} G_STMT_END -#define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START { \ - g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ - cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \ - g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \ -} G_STMT_END +static gpointer gst_collect_pads_loop (gpointer data); static void gst_collect_pads_class_init (GstCollectPadsClass * klass) @@ -257,10 +217,8 @@ gst_collect_pads_init (GstCollectPads * pads) pads->priv->pad_cookie = 0; pads->priv->pad_list = NULL; - /* members for event */ - g_mutex_init (&pads->priv->evt_lock); - g_cond_init (&pads->priv->evt_cond); - pads->priv->evt_cookie = 0; + g_mutex_init (&pads->priv->loop_lock); + g_cond_init (&pads->priv->loop_cond); pads->priv->seeking = FALSE; pads->priv->pending_flush_start = FALSE; @@ -276,8 +234,8 @@ gst_collect_pads_finalize (GObject * object) g_rec_mutex_clear (&pads->stream_lock); - g_cond_clear (&pads->priv->evt_cond); - g_mutex_clear (&pads->priv->evt_lock); + g_cond_clear (&pads->priv->loop_cond); + g_mutex_clear (&pads->priv->loop_lock); /* Remove pads and free pads list */ g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL); @@ -411,6 +369,8 @@ ref_data (GstCollectData * data) static void unref_data (GstCollectData * data) { + gsize size; + g_assert (data != NULL); g_assert (data->priv->refcount > 0); @@ -424,8 +384,14 @@ unref_data (GstCollectData * data) if (data->buffer) { gst_buffer_unref (data->buffer); } - g_free (data->priv); - g_free (data); + + g_cond_clear (&data->priv->pad_cond); + g_mutex_clear (&data->priv->pad_lock); + + size = data->priv->size; + + g_slice_free (GstCollectPadsPrivate, data->priv); + g_slice_free1 (size, data); } /** @@ -621,8 +587,9 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size, GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad)); - data = g_malloc0 (size); - data->priv = g_new0 (GstCollectDataPrivate, 1); + data = g_slice_alloc0 (size); + data->priv = g_slice_new0 (GstCollectDataPrivate); + data->priv->size = size; data->collect = pads; data->pad = gst_object_ref (pad); data->buffer = NULL; @@ -632,6 +599,8 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size, data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0; data->priv->refcount = 1; data->priv->destroy_notify = destroy_notify; + g_mutex_init (&data->priv->pad_lock); + g_cond_init (&data->priv->pad_cond); GST_OBJECT_LOCK (pads); GST_OBJECT_LOCK (pad); @@ -730,8 +699,10 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad) pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list); pads->priv->pad_cookie++; - /* signal waiters because something changed */ - GST_COLLECT_PADS_EVT_BROADCAST (pads); + /* signal loop because something changed */ + g_mutex_lock (&pads->priv->loop_lock); + g_cond_signal (&pads->priv->loop_cond); + g_mutex_unlock (&pads->priv->loop_lock); /* deactivate the pad when needed */ if (!pads->priv->started) @@ -779,10 +750,11 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads, gst_collect_pads_clear (pads, cdata); GST_OBJECT_UNLOCK (cdata->pad); } + /* inform _chain of changes */ + g_mutex_lock (&pad->priv->pad_lock); + g_cond_signal (&pad->priv->pad_cond); + g_mutex_unlock (&pad->priv->pad_lock); } - - /* inform _chain of changes */ - GST_COLLECT_PADS_EVT_BROADCAST (pads); } /** @@ -848,6 +820,8 @@ gst_collect_pads_start (GstCollectPads * pads) gst_collect_pads_set_flushing_unlocked (pads, FALSE); /* Start collect pads */ + pads->priv->thread = + g_thread_new ("collectpads-src", gst_collect_pads_loop, pads); pads->priv->started = TRUE; GST_OBJECT_UNLOCK (pads); GST_COLLECT_PADS_STREAM_UNLOCK (pads); @@ -884,6 +858,15 @@ gst_collect_pads_stop (GstCollectPads * pads) pads->priv->eospads = 0; pads->priv->queuedpads = 0; + /* Stop the loop function */ + g_mutex_lock (pads->priv->loop_lock); + g_cond_signal (pads->priv->loop_cond); + g_mutex_unlock (pads->priv->loop_lock); + + g_thread_join (pads->priv->thread); + g_thread_unref (pads->priv->thread); + pads->priv->thread = NULL; + /* loop over the master pad list and flush buffers */ collected = pads->priv->pad_list; for (; collected; collected = g_slist_next (collected)) { @@ -897,6 +880,10 @@ gst_collect_pads_stop (GstCollectPads * pads) data->pos = 0; } GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS); + + g_mutex_lock (&data->priv->pad_lock); + g_cond_signal (&data->priv->pad_cond); + g_mutex_unlock (&data->priv->pad_lock); } if (pads->priv->earliest_data) @@ -905,8 +892,6 @@ gst_collect_pads_stop (GstCollectPads * pads) pads->priv->earliest_time = GST_CLOCK_TIME_NONE; GST_OBJECT_UNLOCK (pads); - /* Wake them up so they can end the chain functions. */ - GST_COLLECT_PADS_EVT_BROADCAST (pads); GST_COLLECT_PADS_STREAM_UNLOCK (pads); } @@ -966,15 +951,19 @@ gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data) g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL); g_return_val_if_fail (data != NULL, NULL); + g_mutex_lock (&data->priv->pad_lock); if ((result = data->buffer)) { data->buffer = NULL; data->pos = 0; /* one less pad with queued data now */ + g_mutex_lock (&pads->priv->loop_lock); if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) pads->priv->queuedpads--; + g_mutex_unlock (&pads->priv->loop_lock); } - GST_COLLECT_PADS_EVT_BROADCAST (pads); + g_mutex_signal (&data->priv->pad_cond); + g_mutex_unlock (&data->priv->pad_lock); GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p", GST_DEBUG_PAD_NAME (data->pad), result); @@ -1208,6 +1197,7 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data, if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) && (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) != ! !waiting)) { + g_mutex_lock (&pads->priv->loop_lock); /* Set waiting state for this pad */ if (waiting) GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING); @@ -1222,8 +1212,9 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data, pads->priv->queuedpads++; } - /* signal waiters because something changed */ - GST_COLLECT_PADS_EVT_BROADCAST (pads); + /* signal loop because something changed */ + g_cond_signal (&pads->priv->loop_cond); + g_mutex_unlock (&pads->priv->loop_lock); } } @@ -2105,7 +2096,6 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstCollectPads *pads; GstFlowReturn ret; GstBuffer **buffer_p; - guint32 cookie; GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad)); @@ -2119,8 +2109,10 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) pads = data->collect; - GST_COLLECT_PADS_STREAM_LOCK (pads); + g_mutex_lock (&pads->priv->pad_lock); + /* if not started, bail out */ + g_mutex_lock (&pads->priv->loop_lock); if (G_UNLIKELY (!pads->priv->started)) goto not_started; /* check if this pad is flushing */ @@ -2153,8 +2145,11 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_DEBUG_PAD_NAME (pad)); /* One more pad has data queued */ - if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) + if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) { + g_mutex_lock (&pads->priv->loop_lock); pads->priv->queuedpads++; + g_mutex_unlock (&pads->priv->loop_lock); + } buffer_p = &data->buffer; gst_buffer_replace (buffer_p, buffer); @@ -2172,25 +2167,14 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* While we have data queued on this pad try to collect stuff */ do { - /* Check if our collected condition is matched and call the collected - * function if it is */ - ret = gst_collect_pads_check_collected (pads); - /* when an error occurs, we want to report this back to the caller ASAP - * without having to block if the buffer was not popped */ - if (G_UNLIKELY (ret != GST_FLOW_OK)) - goto error; + g_mutex_lock (&pads->priv->loop_lock); + g_cond_signal (&pads->priv->loop_cond); + g_mutex_unlock (&pads->priv->loop_lock); - /* data was consumed, we can exit and accept new data */ - if (data->buffer == NULL) - break; + GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting", + GST_DEBUG_PAD_NAME (pad)); - /* Having the _INIT here means we don't care about any broadcast up to here - * (most of which occur with STREAM_LOCK held, so could not have happened - * anyway). We do care about e.g. a remove initiated broadcast as of this - * point. Putting it here also makes this thread ignores any evt it raised - * itself (as is a usual WAIT semantic). - */ - GST_COLLECT_PADS_EVT_INIT (cookie); + g_cond_wait (&pads->priv->pad_cond, &pads->priv->pad_lock); /* pad could be removed and re-added */ unref_data (data); @@ -2200,21 +2184,10 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) ref_data (data); GST_OBJECT_UNLOCK (pad); - GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting", - GST_DEBUG_PAD_NAME (pad)); - - /* wait to be collected, this must happen from another thread triggered - * by the _chain function of another pad. We release the lock so we - * can get stopped or flushed as well. We can however not get EOS - * because we still hold the STREAM_LOCK. - */ - GST_COLLECT_PADS_STREAM_UNLOCK (pads); - GST_COLLECT_PADS_EVT_WAIT (pads, cookie); - GST_COLLECT_PADS_STREAM_LOCK (pads); - GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad)); /* after a signal, we could be stopped */ + g_mutex_lock (&pads->priv->loop_lock); if (G_UNLIKELY (!pads->priv->started)) goto not_started; /* check if this pad is flushing */ @@ -2225,7 +2198,7 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) while (data->buffer != NULL); unlock_done: - GST_COLLECT_PADS_STREAM_UNLOCK (pads); + g_mutex_unlock (&pads->priv->pad_lock); /* data is definitely NULL if pad_removed goto was run. */ if (data) unref_data (data); @@ -2250,6 +2223,7 @@ no_data: } not_started: { + g_mutex_unlock (&pads->priv->loop_lock); GST_DEBUG ("not started"); gst_collect_pads_clear (pads, data); ret = GST_FLOW_FLUSHING; @@ -2285,3 +2259,23 @@ error: goto unlock_done; } } + +static gpointer +gst_collect_pads_loop (gpointer data) +{ + GstCollectPads *pads = data; + +#if 0 + /* Check if our collected condition is matched and call the collected + * function if it is */ + ret = gst_collect_pads_check_collected (pads); + /* when an error occurs, we want to report this back to the caller ASAP + * without having to block if the buffer was not popped */ + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto error; +#endif + + while (TRUE); + + return NULL; +} |