summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gst/gstpad.c134
-rw-r--r--gst/gstpad.h18
-rw-r--r--tests/check/gst/gstpad.c10
3 files changed, 111 insertions, 51 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c
index 5b784e5a7..6d6412140 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -121,7 +121,9 @@ static void gst_pad_set_property (GObject * object, guint prop_id,
static void gst_pad_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
+#if 0
static GstFlowReturn handle_pad_block (GstPad * pad);
+#endif
static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter);
static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ);
static gboolean gst_pad_activate_default (GstPad * pad);
@@ -1110,30 +1112,24 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked,
GST_OBJECT_LOCK (pad);
- was_blocked = GST_PAD_IS_BLOCKED (pad);
+ /* check if block is busy or pending */
+ was_blocked = GST_PAD_IS_BLOCKED (pad) || GST_PAD_IS_BLOCK_PENDING (pad);
if (G_UNLIKELY (was_blocked == blocked))
goto had_right_state;
- if (blocked) {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
-
- GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
+ if (!blocked) {
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCK_PENDING);
+ /* notify waiters */
+ GST_PAD_BLOCK_BROADCAST (pad);
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
- pad->block_callback = callback;
- pad->block_data = user_data;
- pad->block_destroy_data = destroy_data;
- pad->block_callback_called = FALSE;
- if (!callback) {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
- GST_PAD_BLOCK_WAIT (pad);
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
- }
- } else {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
+ pad->block_callback = NULL;
+ pad->block_data = NULL;
+ pad->block_destroy_data = NULL;
if (GST_PAD_IS_SRC (pad)) {
GstPad *peer;
@@ -1145,26 +1141,48 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked,
GST_OBJECT_UNLOCK (peer);
}
}
+ GST_OBJECT_UNLOCK (pad);
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
+ if (callback) {
+ callback (pad, FALSE, user_data);
- if (pad->block_destroy_data && pad->block_data)
- pad->block_destroy_data (pad->block_data);
+ if (destroy_data)
+ destroy_data (user_data);
+ }
+ } else {
+ if (pad->priv->using == 0) {
+ /* nobody is using the pad, we can signal the block immediately */
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
+ GST_OBJECT_UNLOCK (pad);
- pad->block_callback = callback;
- pad->block_data = user_data;
- pad->block_destroy_data = destroy_data;
- pad->block_callback_called = FALSE;
+ if (callback) {
+ callback (pad, TRUE, user_data);
- GST_PAD_BLOCK_BROADCAST (pad);
- if (!callback) {
- /* no callback, wait for the unblock to happen */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for unblock");
- GST_PAD_BLOCK_WAIT (pad);
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocked");
+ if (destroy_data)
+ destroy_data (user_data);
+ }
+ } else {
+ /* pad still in use, mark pending and save callbacks */
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCK_PENDING);
+
+ if (pad->block_destroy_data && pad->block_data)
+ pad->block_destroy_data (pad->block_data);
+
+ pad->block_callback = callback;
+ pad->block_data = user_data;
+ pad->block_destroy_data = destroy_data;
+
+ if (callback == NULL) {
+ while (!GST_PAD_IS_BLOCKED (pad)) {
+ /* we need to block and wait */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
+ GST_PAD_BLOCK_WAIT (pad);
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
+ }
+ }
+ GST_OBJECT_UNLOCK (pad);
}
}
- GST_OBJECT_UNLOCK (pad);
return TRUE;
@@ -3436,6 +3454,7 @@ gst_pad_query_default (GstPad * pad, GstQuery * query)
}
}
+#if 0
/*
* should be called with pad OBJECT_LOCK and STREAM_LOCK held.
* GST_PAD_IS_BLOCKED (pad) == TRUE when this function is
@@ -3565,6 +3584,7 @@ flushing:
return GST_FLOW_WRONG_STATE;
}
}
+#endif
/* pad offsets */
@@ -3951,22 +3971,36 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
static GstFlowReturn
pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data)
{
- GstFlowReturn ret;
gboolean need_probes, did_probes = FALSE;
again:
GST_OBJECT_LOCK (pad);
- /* FIXME: this check can go away; pad_set_blocked could be implemented with
- * probes completely or probes with an extended pad block. */
- while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
- if ((ret = handle_pad_block (pad)) != GST_FLOW_OK)
- goto flushed;
+ do {
+ if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+ goto flushing;
+
+ /* FIXME: this check can go away; pad_set_blocked could be implemented with
+ * probes completely or probes with an extended pad block. */
+ if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
+ break;
+
+ /* now we block the streaming thread. It can be unlocked when we
+ * deactivate the pad (which will also set the FLUSHING flag) or
+ * when the pad is unblocked. A flushing event will also unblock
+ * the pad after setting the FLUSHING flag. */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "Waiting to be unblocked or set flushing");
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
+ GST_PAD_BLOCK_WAIT (pad);
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
+ } while (TRUE);
need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes && !did_probes)) {
+ /* don't do probes next time */
did_probes = TRUE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
@@ -3976,6 +4010,7 @@ again:
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
goto dropped;
+ /* we released the lock, recheck everything */
goto again;
}
@@ -3990,11 +4025,11 @@ again:
return GST_FLOW_OK;
/* ERRORS */
-flushed:
+flushing:
{
- GST_DEBUG_OBJECT (pad, "pad block stopped by flush");
+ GST_DEBUG_OBJECT (pad, "we are flushing");
GST_OBJECT_UNLOCK (pad);
- return ret;
+ return GST_FLOW_WRONG_STATE;
}
dropped:
{
@@ -4015,6 +4050,25 @@ pad_post_push (GstPad * pad)
{
GST_OBJECT_LOCK (pad);
pad->priv->using--;
+ if (pad->priv->using == 0) {
+ GstPadBlockCallback callback;
+ gpointer user_data;
+
+ /* pad is not active anymore, check if we need to trigger the block */
+ if (GST_PAD_IS_BLOCK_PENDING (pad)) {
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCK_PENDING);
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
+
+ callback = pad->block_callback;
+ user_data = pad->block_data;
+ GST_PAD_BLOCK_BROADCAST (pad);
+ GST_OBJECT_UNLOCK (pad);
+
+ if (callback)
+ callback (pad, TRUE, user_data);
+ GST_OBJECT_LOCK (pad);
+ }
+ }
GST_OBJECT_UNLOCK (pad);
}
@@ -4289,8 +4343,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
GST_OBJECT_LOCK (pad);
+#if 0
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
handle_pad_block (pad);
+#endif
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_connected;
diff --git a/gst/gstpad.h b/gst/gstpad.h
index 970e1ef51..28424299c 100644
--- a/gst/gstpad.h
+++ b/gst/gstpad.h
@@ -527,14 +527,15 @@ typedef enum {
* Pad state flags
*/
typedef enum {
- GST_PAD_BLOCKED = (GST_OBJECT_FLAG_LAST << 0),
- GST_PAD_FLUSHING = (GST_OBJECT_FLAG_LAST << 1),
- GST_PAD_IN_GETCAPS = (GST_OBJECT_FLAG_LAST << 2),
- GST_PAD_IN_SETCAPS = (GST_OBJECT_FLAG_LAST << 3),
- GST_PAD_BLOCKING = (GST_OBJECT_FLAG_LAST << 4),
- GST_PAD_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 5),
- GST_PAD_NEED_EVENTS = (GST_OBJECT_FLAG_LAST << 6),
- GST_PAD_FIXED_CAPS = (GST_OBJECT_FLAG_LAST << 7),
+ GST_PAD_FLUSHING = (GST_OBJECT_FLAG_LAST << 2),
+ GST_PAD_BLOCK_PENDING = (GST_OBJECT_FLAG_LAST << 0),
+ GST_PAD_BLOCKED = (GST_OBJECT_FLAG_LAST << 1),
+ GST_PAD_BLOCKING = (GST_OBJECT_FLAG_LAST << 5),
+ GST_PAD_IN_GETCAPS = (GST_OBJECT_FLAG_LAST << 3),
+ GST_PAD_IN_SETCAPS = (GST_OBJECT_FLAG_LAST << 4),
+ GST_PAD_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 6),
+ GST_PAD_NEED_EVENTS = (GST_OBJECT_FLAG_LAST << 7),
+ GST_PAD_FIXED_CAPS = (GST_OBJECT_FLAG_LAST << 8),
/* padding */
GST_PAD_FLAG_LAST = (GST_OBJECT_FLAG_LAST << 16)
} GstPadFlags;
@@ -701,6 +702,7 @@ struct _GstPadClass {
#define GST_PAD_IS_LINKED(pad) (GST_PAD_PEER(pad) != NULL)
+#define GST_PAD_IS_BLOCK_PENDING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCK_PENDING))
#define GST_PAD_IS_BLOCKED(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
#define GST_PAD_IS_BLOCKING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_BLOCKING))
#define GST_PAD_IS_FLUSHING(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLUSHING))
diff --git a/tests/check/gst/gstpad.c b/tests/check/gst/gstpad.c
index 8d0bbbf3e..bfbc919e6 100644
--- a/tests/check/gst/gstpad.c
+++ b/tests/check/gst/gstpad.c
@@ -276,6 +276,8 @@ GST_START_TEST (test_push_unlinked)
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+ gst_pad_set_active (src, TRUE);
+
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
@@ -720,9 +722,8 @@ GST_START_TEST (test_block_async)
gst_pad_set_active (pad, TRUE);
gst_pad_set_blocked (pad, TRUE, block_async_cb, &data, NULL);
- fail_unless (data[0] == FALSE);
- fail_unless (data[1] == FALSE);
- gst_pad_push (pad, gst_buffer_new ());
+ fail_unless (data[0] == TRUE);
+ fail_unless (data[1] == TRUE);
gst_object_unref (pad);
}
@@ -801,7 +802,7 @@ block_async_full_cb (GstPad * pad, gboolean blocked, gpointer user_data)
*(gint *) user_data = (gint) blocked;
gst_pad_push_event (pad, gst_event_new_flush_start ());
- GST_DEBUG ("setting state to 1");
+ GST_DEBUG ("setting state to %d", blocked);
}
GST_START_TEST (test_block_async_full_destroy)
@@ -817,6 +818,7 @@ GST_START_TEST (test_block_async_full_destroy)
gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 0);
+ fail_unless (gst_pad_is_blocked (pad));
gst_pad_push (pad, gst_buffer_new ());
/* block_async_full_cb sets state to 1 and then flushes to unblock temporarily