summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2011-06-09 17:33:09 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2011-06-09 17:33:09 +0200
commita728403b4cd41c484e8410dd2811c756bc141ffd (patch)
tree1570196c4df8bfb8d41f8fd3747f812da9a1af35
parentab0ab2fbca05030684c9ca4e2b5acba38c01b3ca (diff)
pad: Beginnings of lockfree data passinglockfree-data
-rw-r--r--gst/gstpad.c382
1 files changed, 278 insertions, 104 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c
index 0d194c052..b04324072 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -105,12 +105,77 @@ typedef struct
GstEvent *event;
} PadEvent;
+
+/* The pad state is like this:
+ *
+ * ..-+-+-+-+-+-+-+
+ * ref |U|P|N|F|
+ * ..-+-+-+-+-+-+-+
+ *
+ * The low bits are flags and the high bits a refcount. Right before pushing
+ * out the buffer to the peer, the flags are checked. If they are 0, the ref is
+ * incremented. After the peer returned, we decrement the counter and check the
+ * flags.
+ *
+ * F = flushing, new pushes return wrong-state
+ * N = needs events
+ * P = has probes
+ * U = unlinked, the peer pad should be unreffed.
+ */
+#define STATE_REFSHIFT 4
+#define STATE_FLAGMASK ((1 << STATE_REFSHIFT) - 1)
+#define STATE_BUSY_COUNT(s) ((s) >> STATE_REFSHIFT)
+#define STATE_BUSY_INC(s) ((s) + (1 << STATE_REFSHIFT))
+#define STATE_BUSY_DEC(s) ((s) - (1 << STATE_REFSHIFT))
+#define STATE_IS_BUSY(s) (((s) & ~STATE_FLAGMASK) != 0)
+#define STATE_FLAGS(s) ((s) & STATE_FLAGMASK)
+#define STATE_HAS_FLAGS(s) (STATE_FLAGS(s) != 0)
+#define STATE_IS_SET(s,f) ((s) & (f))
+#define STATE_SET(s,f) ((s) | (f))
+#define STATE_UNSET(s,f) ((s) & ~(f))
+
+#define DEFAULT_STATE (STATE_FLUSHING | STATE_UNLINKED);
+
+typedef enum
+{
+ STATE_FLUSHING = (1 << 0),
+ STATE_NEEDS_EVENTS = (1 << 1),
+ STATE_PROBES = (1 << 2),
+ STATE_UNLINKED = (1 << 3),
+} PadState;
+
+#define PAD_STATE(p) ((p)->priv->state)
+#define PAD_GET_STATE(p) g_atomic_int_get(&PAD_STATE(p))
+#define PAD_BUSY_DEC(p) g_atomic_int_exchange_and_add(&PAD_STATE(p), -(1 << STATE_REFSHIFT))
+#define PAD_UPDATE_STATE(p,o,n) g_atomic_int_compare_and_exchange (&PAD_STATE(p),(o),(n))
+
+#define PAD_STATE_SET(p,f) \
+ G_STMT_START { \
+ gint state, updated; \
+ do { \
+ state = PAD_GET_STATE (p); \
+ updated = STATE_SET (state, f); \
+ } while (!PAD_UPDATE_STATE (p, state, updated));\
+ } G_STMT_END
+
+#define PAD_STATE_UNSET(p,f) \
+ G_STMT_START { \
+ gint state, updated; \
+ do { \
+ state = PAD_GET_STATE (p); \
+ updated = STATE_UNSET (state, f); \
+ } while (!PAD_UPDATE_STATE (p, state, updated));\
+ } G_STMT_END
+
struct _GstPadPrivate
{
PadEvent events[GST_EVENT_MAX_STICKY];
gint using;
gint probe_cookie;
+
+ volatile gint state;
+ GstPad *peer;
};
typedef struct
@@ -329,6 +394,8 @@ gst_pad_init (GstPad * pad)
pad->block_cond = g_cond_new ();
g_hook_list_init (&pad->probes, sizeof (GstProbe));
+
+ pad->priv->state = DEFAULT_STATE;
}
/* called when setting the pad inactive. It removes all sticky events from
@@ -407,8 +474,10 @@ prepare_event_update (GstPad * srcpad, GstPad * sinkpad)
pending |= replace_event (srcpad, sinkpad, i);
/* we had some new pending events, set our flag */
- if (pending)
+ if (pending) {
GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_NEED_EVENTS);
+ PAD_STATE_SET (sinkpad, STATE_NEEDS_EVENTS);
+ }
}
/* should be called with the OBJECT_LOCK */
@@ -630,6 +699,7 @@ pre_activate (GstPad * pad, GstActivateMode new_mode)
GST_DEBUG_OBJECT (pad, "setting ACTIVATE_MODE %d, unset flushing",
new_mode);
GST_PAD_UNSET_FLUSHING (pad);
+ PAD_STATE_UNSET (pad, STATE_FLUSHING);
GST_PAD_ACTIVATE_MODE (pad) = new_mode;
GST_OBJECT_UNLOCK (pad);
break;
@@ -637,6 +707,7 @@ pre_activate (GstPad * pad, GstActivateMode new_mode)
GST_OBJECT_LOCK (pad);
GST_DEBUG_OBJECT (pad, "setting ACTIVATE_MODE NONE, set flushing");
GST_PAD_SET_FLUSHING (pad);
+ PAD_STATE_SET (pad, STATE_FLUSHING);
GST_PAD_ACTIVATE_MODE (pad) = new_mode;
/* unlock blocked pads so element can resume and stop */
GST_PAD_BLOCK_BROADCAST (pad);
@@ -892,6 +963,7 @@ failure:
GST_CAT_INFO_OBJECT (GST_CAT_PADS, pad, "failed to %s in pull mode",
active ? "activate" : "deactivate");
GST_PAD_SET_FLUSHING (pad);
+ PAD_STATE_SET (pad, STATE_FLUSHING);
GST_PAD_ACTIVATE_MODE (pad) = old;
GST_OBJECT_UNLOCK (pad);
return FALSE;
@@ -997,6 +1069,7 @@ failure:
GST_CAT_INFO_OBJECT (GST_CAT_PADS, pad, "failed to %s in push mode",
active ? "activate" : "deactivate");
GST_PAD_SET_FLUSHING (pad);
+ PAD_STATE_SET (pad, STATE_FLUSHING);
GST_PAD_ACTIVATE_MODE (pad) = old;
GST_OBJECT_UNLOCK (pad);
return FALSE;
@@ -1087,6 +1160,7 @@ gst_pad_add_probe (GstPad * pad, GstProbeType mask,
/* add the probe */
g_hook_prepend (&pad->probes, hook);
pad->num_probes++;
+ PAD_STATE_SET (pad, STATE_PROBES);
/* get the id of the hook, we return this and it can be used to remove the
* probe later */
@@ -1145,6 +1219,8 @@ cleanup_hook (GstPad * pad, GHook * hook)
}
g_hook_destroy_link (&pad->probes, hook);
pad->num_probes--;
+ if (pad->num_probes == 0)
+ PAD_STATE_UNSET (pad, STATE_PROBES);
}
/**
@@ -1679,6 +1755,10 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
GST_PAD_UNLINKFUNC (sinkpad) (sinkpad);
}
+ /* FIXME, do something more clever */
+ PAD_STATE_SET (srcpad, STATE_UNLINKED);
+ PAD_STATE_SET (sinkpad, STATE_UNLINKED);
+
/* first clear peers */
GST_PAD_PEER (srcpad) = NULL;
GST_PAD_PEER (sinkpad) = NULL;
@@ -2030,6 +2110,10 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
if (G_UNLIKELY (result != GST_PAD_LINK_OK))
goto done;
+ /* FIXME, do something more clever */
+ PAD_STATE_SET (srcpad, STATE_UNLINKED);
+ PAD_STATE_SET (sinkpad, STATE_UNLINKED);
+
/* must set peers before calling the link function */
GST_PAD_PEER (srcpad) = sinkpad;
GST_PAD_PEER (sinkpad) = srcpad;
@@ -2713,8 +2797,7 @@ not_accepted:
}
/* function to send all pending events on the sinkpad to the event
- * function and collect the results. This function should be called with
- * the object lock. The object lock might be released by this function.
+ * function and collect the results.
*/
static GstFlowReturn
gst_pad_update_events (GstPad * pad)
@@ -2738,16 +2821,9 @@ gst_pad_update_events (GstPad * pad)
continue;
gst_event_ref (event);
- GST_OBJECT_UNLOCK (pad);
res = do_event_function (pad, event, eventfunc);
- GST_OBJECT_LOCK (pad);
- /* things could have changed while we release the lock, check if we still
- * are handling the same event, if we don't something changed and we have
- * to try again. FIXME. we need a cookie here. FIXME, we also want to remove
- * that lock eventually and then do the retry elsewhere. */
-
if (res) {
/* make the event active */
gst_event_take (&ev->event, event);
@@ -3372,13 +3448,11 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
}
}
-#define PROBE(pad,mask,data,label) \
- G_STMT_START { \
- if (G_UNLIKELY (pad->num_probes)) { \
- ret = do_probe_callbacks (pad, mask, data); \
- if (G_UNLIKELY (ret != GST_FLOW_OK)) \
- goto label; \
- } \
+#define PROBE(pad,mask,data,label) \
+ G_STMT_START { \
+ ret = do_probe_callbacks (pad, mask, data); \
+ if (G_UNLIKELY (ret != GST_FLOW_OK)) \
+ goto label; \
} G_STMT_END
static GstFlowReturn
@@ -3538,8 +3612,10 @@ gst_pad_set_offset (GstPad * pad, gint64 offset)
GST_OBJECT_LOCK (peer);
/* take the current segment event, adjust it and then place
* it on the sinkpad. events on the srcpad are always active. */
- if (replace_event (pad, peer, idx))
+ if (replace_event (pad, peer, idx)) {
GST_OBJECT_FLAG_SET (peer, GST_PAD_NEED_EVENTS);
+ PAD_STATE_SET (peer, STATE_NEEDS_EVENTS);
+ }
GST_OBJECT_UNLOCK (peer);
@@ -3559,28 +3635,38 @@ static inline GstFlowReturn
gst_pad_chain_data_unchecked (GstPad * pad, GstProbeType type, void *data)
{
GstFlowReturn ret;
- gboolean needs_events;
+ PadState state, updated;
GST_PAD_STREAM_LOCK (pad);
- GST_OBJECT_LOCK (pad);
- if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
- goto flushing;
-
- needs_events = GST_PAD_NEEDS_EVENTS (pad);
- if (G_UNLIKELY (needs_events)) {
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS);
+again:
+ state = PAD_GET_STATE (pad);
+
+ /* no flags, we can just start processing */
+ if (G_UNLIKELY (STATE_HAS_FLAGS (state))) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING)))
+ goto flushing;
+
+ /* try to clear the need events flag */
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_NEEDS_EVENTS))) {
+ updated = STATE_UNSET (state, STATE_NEEDS_EVENTS);
+ if (!PAD_UPDATE_STATE (pad, state, updated))
+ goto again;
+
+ GST_DEBUG_OBJECT (pad, "need to update all events");
+ ret = gst_pad_update_events (pad);
+ if (G_UNLIKELY (ret != GST_FLOW_OK))
+ goto events_error;
+ }
- GST_DEBUG_OBJECT (pad, "need to update all events");
- ret = gst_pad_update_events (pad);
- if (G_UNLIKELY (ret != GST_FLOW_OK))
- goto events_error;
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing probes");
+ PROBE (pad, GST_PROBE_TYPE_PUSH | type, data, probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
}
- PROBE (pad, GST_PROBE_TYPE_PUSH | type, data, probe_stopped);
-
- GST_OBJECT_UNLOCK (pad);
-
/* NOTE: we read the chainfunc unlocked.
* we cannot hold the lock for the pad so we might send
* the data to the wrong function. This is not really a
@@ -3627,14 +3713,12 @@ flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"chaining, but pad was flushing");
- GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
}
events_error:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted");
- GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
return ret;
}
@@ -3769,40 +3853,78 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
static GstFlowReturn
gst_pad_push_data (GstPad * pad, GstProbeType type, void *data)
{
- GstPad *peer;
GstFlowReturn ret;
-
- GST_OBJECT_LOCK (pad);
- if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
- goto flushing;
+ PadState state, updated;
+ gboolean flags;
type |= GST_PROBE_TYPE_PUSH;
- /* do block probes */
- PROBE (pad, type | GST_PROBE_TYPE_BLOCK, data, probe_stopped);
+again:
+ state = PAD_GET_STATE (pad);
- /* do post-blocking probes */
- PROBE (pad, type, data, probe_stopped);
+ /* increment the busy counter */
+ updated = STATE_BUSY_INC (state);
- if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
- goto not_linked;
+ flags = STATE_HAS_FLAGS (state);
+ /* check flags first */
+ if (G_UNLIKELY (flags)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING)))
+ goto flushing;
- /* take ref to peer pad before releasing the lock */
- gst_object_ref (peer);
- pad->priv->using++;
- GST_OBJECT_UNLOCK (pad);
+ /* clear unlinked flag */
+ updated = STATE_UNSET (updated, STATE_UNLINKED);
+ }
- ret = gst_pad_chain_data_unchecked (peer, type, data);
+ /* update state now */
+ if (G_UNLIKELY (!PAD_UPDATE_STATE (pad, state, updated)))
+ goto again;
- gst_object_unref (peer);
+ if (G_UNLIKELY (flags)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing pad probes");
+ /* do block probes */
+ PROBE (pad, type | GST_PROBE_TYPE_BLOCK, data, probe_stopped);
- GST_OBJECT_LOCK (pad);
- pad->priv->using--;
- if (pad->priv->using == 0) {
- /* pad is not active anymore, trigger idle callbacks */
- PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL, probe_stopped);
+ /* do post-blocking probes */
+ PROBE (pad, type, data, probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
+
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) {
+ GstPad *peer;
+
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "getting new peer pad");
+ if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
+ goto not_linked;
+
+ /* take ref to peer pad before releasing the lock */
+ gst_object_replace ((GstObject **) & pad->priv->peer, (GstObject *) peer);
+ GST_OBJECT_UNLOCK (pad);
+ }
+ }
+
+ ret = gst_pad_chain_data_unchecked (pad->priv->peer, type, data);
+
+ state = PAD_BUSY_DEC (pad);
+
+ if (G_UNLIKELY (STATE_HAS_FLAGS (state) && STATE_BUSY_COUNT (state) == 1)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing idle probes");
+ /* pad is not active anymore, trigger idle callbacks */
+ PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL,
+ probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "clearing unlinked pad");
+ gst_object_replace ((GstObject **) & pad->priv->peer, NULL);
+ GST_OBJECT_UNLOCK (pad);
+ }
}
- GST_OBJECT_UNLOCK (pad);
return ret;
@@ -3812,7 +3934,6 @@ flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but pad was flushing");
- GST_OBJECT_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
return GST_FLOW_WRONG_STATE;
}
@@ -3836,6 +3957,7 @@ not_linked:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but it was not linked");
+ gst_object_replace ((GstObject **) & pad->priv->peer, NULL);
GST_OBJECT_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
return GST_FLOW_NOT_LINKED;
@@ -3916,13 +4038,15 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
{
GstFlowReturn ret;
GstPadGetRangeFunction getrangefunc;
+ PadState state;
GST_PAD_STREAM_LOCK (pad);
- GST_OBJECT_LOCK (pad);
- if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+ state = PAD_GET_STATE (pad);
+
+ /* no flags, we can just start processing */
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING)))
goto flushing;
- GST_OBJECT_UNLOCK (pad);
if (G_UNLIKELY ((getrangefunc = GST_PAD_GETRANGEFUNC (pad)) == NULL))
goto no_function;
@@ -3934,14 +4058,17 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
ret = getrangefunc (pad, offset, size, buffer);
+ /* can only fire the signal if we have a valid buffer */
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto get_range_failed;
- /* can only fire the signal if we have a valid buffer */
- GST_OBJECT_LOCK (pad);
- PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, *buffer,
- probe_stopped);
- GST_OBJECT_UNLOCK (pad);
+ state = PAD_GET_STATE (pad);
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, *buffer,
+ probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
GST_PAD_STREAM_UNLOCK (pad);
@@ -3952,7 +4079,6 @@ flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"getrange, but pad was flushing");
- GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
}
@@ -4052,56 +4178,101 @@ GstFlowReturn
gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
GstBuffer ** buffer)
{
- GstPad *peer;
GstFlowReturn ret;
- gboolean needs_events;
+ PadState state, updated;
+ gboolean flags;
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
- GST_OBJECT_LOCK (pad);
- if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
- goto flushing;
+again:
+ state = PAD_GET_STATE (pad);
+ /* increment the busy counter */
+ updated = STATE_BUSY_INC (state);
+
+ flags = STATE_HAS_FLAGS (state);
+ /* check flags first */
+ if (G_UNLIKELY (flags)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING)))
+ goto flushing;
+
+ /* clear unlinked flag */
+ updated = STATE_UNSET (updated, STATE_UNLINKED);
+ }
- PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, NULL,
- pre_probe_stopped);
+ /* update state now */
+ if (G_UNLIKELY (!PAD_UPDATE_STATE (pad, state, updated)))
+ goto again;
- if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
- goto not_linked;
+ if (G_UNLIKELY (flags)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing pad probes");
+ /* do block probes */
+ PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, NULL,
+ pre_probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
- gst_object_ref (peer);
- pad->priv->using++;
- GST_OBJECT_UNLOCK (pad);
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) {
+ GstPad *peer;
- ret = gst_pad_get_range_unchecked (peer, offset, size, buffer);
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "getting new peer pad");
+ if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
+ goto not_linked;
- gst_object_unref (peer);
+ /* take ref to peer pad before releasing the lock */
+ gst_object_replace ((GstObject **) & pad->priv->peer, (GstObject *) peer);
+ GST_OBJECT_UNLOCK (pad);
+ }
+ }
- GST_OBJECT_LOCK (pad);
- pad->priv->using--;
- if (pad->priv->using == 0) {
- /* pad is not active anymore, trigger idle callbacks */
- PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_IDLE, NULL,
- post_probe_stopped);
+ ret = gst_pad_get_range_unchecked (pad->priv->peer, offset, size, buffer);
+
+ state = PAD_BUSY_DEC (pad);
+
+ flags = STATE_HAS_FLAGS (state);
+ if (G_UNLIKELY (flags && STATE_BUSY_COUNT (state) == 1)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing idle probes");
+ /* pad is not active anymore, trigger idle callbacks */
+ PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_IDLE, NULL,
+ post_probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "clearing unlinked pad");
+ gst_object_replace ((GstObject **) & pad->priv->peer, NULL);
+ GST_OBJECT_UNLOCK (pad);
+ }
}
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto pull_range_failed;
- PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, buffer,
- post_probe_stopped);
+ if (G_UNLIKELY (flags)) {
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) {
+ GST_OBJECT_LOCK (pad);
+ GST_DEBUG_OBJECT (pad, "doing data probes");
+ PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, buffer,
+ post_probe_stopped);
+ GST_OBJECT_UNLOCK (pad);
+ }
- needs_events = GST_PAD_NEEDS_EVENTS (pad);
- if (G_UNLIKELY (needs_events)) {
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS);
+ /* try to clear the need events flag */
+ if (G_UNLIKELY (STATE_IS_SET (state, STATE_NEEDS_EVENTS))) {
+ PAD_STATE_UNSET (pad, STATE_NEEDS_EVENTS);
- GST_DEBUG_OBJECT (pad, "we need to update the events");
- ret = gst_pad_update_events (pad);
- if (G_UNLIKELY (ret != GST_FLOW_OK))
- goto events_error;
+ GST_DEBUG_OBJECT (pad, "need to update all events");
+ ret = gst_pad_update_events (pad);
+ if (G_UNLIKELY (ret != GST_FLOW_OK))
+ goto events_error;
+ }
}
- GST_OBJECT_UNLOCK (pad);
return ret;
@@ -4110,7 +4281,6 @@ flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pullrange, but pad was flushing");
- GST_OBJECT_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
}
pre_probe_stopped:
@@ -4124,13 +4294,13 @@ not_linked:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pulling range, but it was not linked");
+ gst_object_replace ((GstObject **) & pad->priv->peer, NULL);
GST_OBJECT_UNLOCK (pad);
return GST_FLOW_NOT_LINKED;
}
pull_range_failed:
{
*buffer = NULL;
- GST_OBJECT_UNLOCK (pad);
GST_CAT_LEVEL_LOG (GST_CAT_SCHEDULING,
(ret >= GST_FLOW_UNEXPECTED) ? GST_LEVEL_INFO : GST_LEVEL_WARNING,
pad, "pullrange failed, flow: %s", gst_flow_get_name (ret));
@@ -4148,7 +4318,6 @@ post_probe_stopped:
}
events_error:
{
- GST_OBJECT_UNLOCK (pad);
gst_buffer_unref (*buffer);
*buffer = NULL;
GST_CAT_WARNING_OBJECT (GST_CAT_SCHEDULING, pad,
@@ -4197,6 +4366,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
GST_PAD_SET_FLUSHING (pad);
+ PAD_STATE_SET (pad, STATE_FLUSHING);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
/* flush start will have set the FLUSHING flag and will then
@@ -4210,6 +4380,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
break;
case GST_EVENT_FLUSH_STOP:
GST_PAD_UNSET_FLUSHING (pad);
+ PAD_STATE_UNSET (pad, STATE_FLUSHING);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
goto flushed;
@@ -4407,12 +4578,14 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
goto flushing;
GST_PAD_SET_FLUSHING (pad);
+ PAD_STATE_SET (pad, STATE_FLUSHING);
GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag");
needs_events = FALSE;
break;
case GST_EVENT_FLUSH_STOP:
if (G_LIKELY (GST_PAD_ACTIVATE_MODE (pad) != GST_ACTIVATE_NONE)) {
GST_PAD_UNSET_FLUSHING (pad);
+ PAD_STATE_UNSET (pad, STATE_FLUSHING);
GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "cleared flush flag");
}
GST_OBJECT_UNLOCK (pad);
@@ -4475,6 +4648,7 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
/* set the flag so that we update the events next time. We would
* usually update below but we might be flushing too. */
GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_EVENTS);
+ PAD_STATE_SET (pad, STATE_NEEDS_EVENTS);
needs_events = TRUE;
}
}
@@ -4492,12 +4666,13 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
GstFlowReturn ret;
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS);
+ PAD_STATE_UNSET (pad, STATE_NEEDS_EVENTS);
+ GST_OBJECT_UNLOCK (pad);
GST_DEBUG_OBJECT (pad, "need to update all events");
ret = gst_pad_update_events (pad);
if (ret != GST_FLOW_OK)
goto update_failed;
- GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
@@ -4567,7 +4742,6 @@ probe_stopped:
}
update_failed:
{
- GST_OBJECT_UNLOCK (pad);
if (need_unlock)
GST_PAD_STREAM_UNLOCK (pad);
GST_CAT_INFO_OBJECT (GST_CAT_EVENT, pad, "Update events failed");