diff options
-rw-r--r-- | gst/gstpad.c | 134 | ||||
-rw-r--r-- | gst/gstpad.h | 18 | ||||
-rw-r--r-- | tests/check/gst/gstpad.c | 10 |
3 files changed, 111 insertions, 51 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c index 5b784e5a7..6d6412140 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -121,7 +121,9 @@ static void gst_pad_set_property (GObject * object, guint prop_id, static void gst_pad_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); +#if 0 static GstFlowReturn handle_pad_block (GstPad * pad); +#endif static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter); static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ); static gboolean gst_pad_activate_default (GstPad * pad); @@ -1110,30 +1112,24 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked, GST_OBJECT_LOCK (pad); - was_blocked = GST_PAD_IS_BLOCKED (pad); + /* check if block is busy or pending */ + was_blocked = GST_PAD_IS_BLOCKED (pad) || GST_PAD_IS_BLOCK_PENDING (pad); if (G_UNLIKELY (was_blocked == blocked)) goto had_right_state; - if (blocked) { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad"); - - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + if (!blocked) { + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCK_PENDING); + /* notify waiters */ + GST_PAD_BLOCK_BROADCAST (pad); if (pad->block_destroy_data && pad->block_data) pad->block_destroy_data (pad->block_data); - pad->block_callback = callback; - pad->block_data = user_data; - pad->block_destroy_data = destroy_data; - pad->block_callback_called = FALSE; - if (!callback) { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block"); - GST_PAD_BLOCK_WAIT (pad); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked"); - } - } else { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad"); + pad->block_callback = NULL; + pad->block_data = NULL; + pad->block_destroy_data = NULL; if (GST_PAD_IS_SRC (pad)) { GstPad *peer; @@ -1145,26 +1141,48 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked, GST_OBJECT_UNLOCK (peer); } } + GST_OBJECT_UNLOCK (pad); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); + if (callback) { + callback (pad, FALSE, user_data); - if (pad->block_destroy_data && pad->block_data) - pad->block_destroy_data (pad->block_data); + if (destroy_data) + destroy_data (user_data); + } + } else { + if (pad->priv->using == 0) { + /* nobody is using the pad, we can signal the block immediately */ + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + GST_OBJECT_UNLOCK (pad); - pad->block_callback = callback; - pad->block_data = user_data; - pad->block_destroy_data = destroy_data; - pad->block_callback_called = FALSE; + if (callback) { + callback (pad, TRUE, user_data); - GST_PAD_BLOCK_BROADCAST (pad); - if (!callback) { - /* no callback, wait for the unblock to happen */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for unblock"); - GST_PAD_BLOCK_WAIT (pad); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocked"); + if (destroy_data) + destroy_data (user_data); + } + } else { + /* pad still in use, mark pending and save callbacks */ + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCK_PENDING); + + if (pad->block_destroy_data && pad->block_data) + pad->block_destroy_data (pad->block_data); + + pad->block_callback = callback; + pad->block_data = user_data; + pad->block_destroy_data = destroy_data; + + if (callback == NULL) { + while (!GST_PAD_IS_BLOCKED (pad)) { + /* we need to block and wait */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block"); + GST_PAD_BLOCK_WAIT (pad); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked"); + } + } + GST_OBJECT_UNLOCK (pad); } } - GST_OBJECT_UNLOCK (pad); return TRUE; @@ -3436,6 +3454,7 @@ gst_pad_query_default (GstPad * pad, GstQuery * query) } } +#if 0 /* * should be called with pad OBJECT_LOCK and STREAM_LOCK held. * GST_PAD_IS_BLOCKED (pad) == TRUE when this function is @@ -3565,6 +3584,7 @@ flushing: return GST_FLOW_WRONG_STATE; } } +#endif /* pad offsets */ @@ -3951,22 +3971,36 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list) static GstFlowReturn pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data) { - GstFlowReturn ret; gboolean need_probes, did_probes = FALSE; again: GST_OBJECT_LOCK (pad); - /* FIXME: this check can go away; pad_set_blocked could be implemented with - * probes completely or probes with an extended pad block. */ - while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - if ((ret = handle_pad_block (pad)) != GST_FLOW_OK) - goto flushed; + do { + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; + + /* FIXME: this check can go away; pad_set_blocked could be implemented with + * probes completely or probes with an extended pad block. */ + if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad))) + break; + + /* now we block the streaming thread. It can be unlocked when we + * deactivate the pad (which will also set the FLUSHING flag) or + * when the pad is unblocked. A flushing event will also unblock + * the pad after setting the FLUSHING flag. */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "Waiting to be unblocked or set flushing"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); + GST_PAD_BLOCK_WAIT (pad); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); + } while (TRUE); need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0; /* we emit signals on the pad arg, the peer will have a chance to * emit in the _chain() function */ if (G_UNLIKELY (need_probes && !did_probes)) { + /* don't do probes next time */ did_probes = TRUE; /* unlock before emitting */ GST_OBJECT_UNLOCK (pad); @@ -3976,6 +4010,7 @@ again: if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data))) goto dropped; + /* we released the lock, recheck everything */ goto again; } @@ -3990,11 +4025,11 @@ again: return GST_FLOW_OK; /* ERRORS */ -flushed: +flushing: { - GST_DEBUG_OBJECT (pad, "pad block stopped by flush"); + GST_DEBUG_OBJECT (pad, "we are flushing"); GST_OBJECT_UNLOCK (pad); - return ret; + return GST_FLOW_WRONG_STATE; } dropped: { @@ -4015,6 +4050,25 @@ pad_post_push (GstPad * pad) { GST_OBJECT_LOCK (pad); pad->priv->using--; + if (pad->priv->using == 0) { + GstPadBlockCallback callback; + gpointer user_data; + + /* pad is not active anymore, check if we need to trigger the block */ + if (GST_PAD_IS_BLOCK_PENDING (pad)) { + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCK_PENDING); + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + + callback = pad->block_callback; + user_data = pad->block_data; + GST_PAD_BLOCK_BROADCAST (pad); + GST_OBJECT_UNLOCK (pad); + + if (callback) + callback (pad, TRUE, user_data); + GST_OBJECT_LOCK (pad); + } + } GST_OBJECT_UNLOCK (pad); } @@ -4289,8 +4343,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, GST_OBJECT_LOCK (pad); +#if 0 while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) handle_pad_block (pad); +#endif if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) goto not_connected; diff --git a/gst/gstpad.h b/gst/gstpad.h index 970e1ef51..28424299c 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -527,14 +527,15 @@ typedef enum { * Pad state flags */ typedef enum { - GST_PAD_BLOCKED = (GST_OBJECT_FLAG_LAST << 0), - GST_PAD_FLUSHING = (GST_OBJECT_FLAG_LAST << 1), - GST_PAD_IN_GETCAPS = (GST_OBJECT_FLAG_LAST << 2), - GST_PAD_IN_SETCAPS = (GST_OBJECT_FLAG_LAST << 3), - GST_PAD_BLOCKING = (GST_OBJECT_FLAG_LAST << 4), - GST_PAD_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 5), - GST_PAD_NEED_EVENTS = (GST_OBJECT_FLAG_LAST << 6), - GST_PAD_FIXED_CAPS = (GST_OBJECT_FLAG_LAST << 7), + GST_PAD_FLUSHING = (GST_OBJECT_FLAG_LAST << 2), + GST_PAD_BLOCK_PENDING = (GST_OBJECT_FLAG_LAST << 0), + GST_PAD_BLOCKED = (GST_OBJECT_FLAG_LAST << 1), + GST_PAD_BLOCKING = (GST_OBJECT_FLAG_LAST << 5), + GST_PAD_IN_GETCAPS = (GST_OBJECT_FLAG_LAST << 3), + GST_PAD_IN_SETCAPS = (GST_OBJECT_FLAG_LAST << 4), + GST_PAD_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 6), + GST_PAD_NEED_EVENTS = (GST_OBJECT_FLAG_LAST << 7), + GST_PAD_FIXED_CAPS = (GST_OBJECT_FLAG_LAST << 8), /* padding */ GST_PAD_FLAG_LAST = (GST_OBJECT_FLAG_LAST << 16) } GstPadFlags; @@ -701,6 +702,7 @@ struct _GstPadClass { #define GST_PAD_IS_LINKED(pad) (GST_PAD_PEER(pad) != NULL) +#define GST_PAD_IS_BLOCK_PENDING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCK_PENDING)) #define GST_PAD_IS_BLOCKED(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCKED)) #define GST_PAD_IS_BLOCKING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCKING)) #define GST_PAD_IS_FLUSHING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLUSHING)) diff --git a/tests/check/gst/gstpad.c b/tests/check/gst/gstpad.c index 8d0bbbf3e..bfbc919e6 100644 --- a/tests/check/gst/gstpad.c +++ b/tests/check/gst/gstpad.c @@ -276,6 +276,8 @@ GST_START_TEST (test_push_unlinked) gst_pad_set_caps (src, caps); ASSERT_CAPS_REFCOUNT (caps, "caps", 2); + gst_pad_set_active (src, TRUE); + /* pushing on an unlinked pad will drop the buffer */ buffer = gst_buffer_new (); gst_buffer_ref (buffer); @@ -720,9 +722,8 @@ GST_START_TEST (test_block_async) gst_pad_set_active (pad, TRUE); gst_pad_set_blocked (pad, TRUE, block_async_cb, &data, NULL); - fail_unless (data[0] == FALSE); - fail_unless (data[1] == FALSE); - gst_pad_push (pad, gst_buffer_new ()); + fail_unless (data[0] == TRUE); + fail_unless (data[1] == TRUE); gst_object_unref (pad); } @@ -801,7 +802,7 @@ block_async_full_cb (GstPad * pad, gboolean blocked, gpointer user_data) *(gint *) user_data = (gint) blocked; gst_pad_push_event (pad, gst_event_new_flush_start ()); - GST_DEBUG ("setting state to 1"); + GST_DEBUG ("setting state to %d", blocked); } GST_START_TEST (test_block_async_full_destroy) @@ -817,6 +818,7 @@ GST_START_TEST (test_block_async_full_destroy) gst_pad_set_blocked (pad, TRUE, block_async_full_cb, &state, block_async_full_destroy); fail_unless (state == 0); + fail_unless (gst_pad_is_blocked (pad)); gst_pad_push (pad, gst_buffer_new ()); /* block_async_full_cb sets state to 1 and then flushes to unblock temporarily |