diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2011-06-09 17:33:09 +0200 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2011-06-09 17:33:09 +0200 |
commit | a728403b4cd41c484e8410dd2811c756bc141ffd (patch) | |
tree | 1570196c4df8bfb8d41f8fd3747f812da9a1af35 | |
parent | ab0ab2fbca05030684c9ca4e2b5acba38c01b3ca (diff) |
pad: Beginnings of lockfree data passinglockfree-data
-rw-r--r-- | gst/gstpad.c | 382 |
1 files changed, 278 insertions, 104 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c index 0d194c052..b04324072 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -105,12 +105,77 @@ typedef struct GstEvent *event; } PadEvent; + +/* The pad state is like this: + * + * ..-+-+-+-+-+-+-+ + * ref |U|P|N|F| + * ..-+-+-+-+-+-+-+ + * + * The low bits are flags and the high bits a refcount. Right before pushing + * out the buffer to the peer, the flags are checked. If they are 0, the ref is + * incremented. After the peer returned, we decrement the counter and check the + * flags. + * + * F = flushing, new pushes return wrong-state + * N = needs events + * P = has probes + * U = unlinked, the peer pad should be unreffed. + */ +#define STATE_REFSHIFT 4 +#define STATE_FLAGMASK ((1 << STATE_REFSHIFT) - 1) +#define STATE_BUSY_COUNT(s) ((s) >> STATE_REFSHIFT) +#define STATE_BUSY_INC(s) ((s) + (1 << STATE_REFSHIFT)) +#define STATE_BUSY_DEC(s) ((s) - (1 << STATE_REFSHIFT)) +#define STATE_IS_BUSY(s) (((s) & ~STATE_FLAGMASK) != 0) +#define STATE_FLAGS(s) ((s) & STATE_FLAGMASK) +#define STATE_HAS_FLAGS(s) (STATE_FLAGS(s) != 0) +#define STATE_IS_SET(s,f) ((s) & (f)) +#define STATE_SET(s,f) ((s) | (f)) +#define STATE_UNSET(s,f) ((s) & ~(f)) + +#define DEFAULT_STATE (STATE_FLUSHING | STATE_UNLINKED); + +typedef enum +{ + STATE_FLUSHING = (1 << 0), + STATE_NEEDS_EVENTS = (1 << 1), + STATE_PROBES = (1 << 2), + STATE_UNLINKED = (1 << 3), +} PadState; + +#define PAD_STATE(p) ((p)->priv->state) +#define PAD_GET_STATE(p) g_atomic_int_get(&PAD_STATE(p)) +#define PAD_BUSY_DEC(p) g_atomic_int_exchange_and_add(&PAD_STATE(p), -(1 << STATE_REFSHIFT)) +#define PAD_UPDATE_STATE(p,o,n) g_atomic_int_compare_and_exchange (&PAD_STATE(p),(o),(n)) + +#define PAD_STATE_SET(p,f) \ + G_STMT_START { \ + gint state, updated; \ + do { \ + state = PAD_GET_STATE (p); \ + updated = STATE_SET (state, f); \ + } while (!PAD_UPDATE_STATE (p, state, updated));\ + } G_STMT_END + +#define PAD_STATE_UNSET(p,f) \ + G_STMT_START { \ + gint state, updated; \ + do { \ + state = PAD_GET_STATE (p); \ + updated = STATE_UNSET (state, f); \ + } while (!PAD_UPDATE_STATE (p, state, updated));\ + } G_STMT_END + struct _GstPadPrivate { PadEvent events[GST_EVENT_MAX_STICKY]; gint using; gint probe_cookie; + + volatile gint state; + GstPad *peer; }; typedef struct @@ -329,6 +394,8 @@ gst_pad_init (GstPad * pad) pad->block_cond = g_cond_new (); g_hook_list_init (&pad->probes, sizeof (GstProbe)); + + pad->priv->state = DEFAULT_STATE; } /* called when setting the pad inactive. It removes all sticky events from @@ -407,8 +474,10 @@ prepare_event_update (GstPad * srcpad, GstPad * sinkpad) pending |= replace_event (srcpad, sinkpad, i); /* we had some new pending events, set our flag */ - if (pending) + if (pending) { GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_NEED_EVENTS); + PAD_STATE_SET (sinkpad, STATE_NEEDS_EVENTS); + } } /* should be called with the OBJECT_LOCK */ @@ -630,6 +699,7 @@ pre_activate (GstPad * pad, GstActivateMode new_mode) GST_DEBUG_OBJECT (pad, "setting ACTIVATE_MODE %d, unset flushing", new_mode); GST_PAD_UNSET_FLUSHING (pad); + PAD_STATE_UNSET (pad, STATE_FLUSHING); GST_PAD_ACTIVATE_MODE (pad) = new_mode; GST_OBJECT_UNLOCK (pad); break; @@ -637,6 +707,7 @@ pre_activate (GstPad * pad, GstActivateMode new_mode) GST_OBJECT_LOCK (pad); GST_DEBUG_OBJECT (pad, "setting ACTIVATE_MODE NONE, set flushing"); GST_PAD_SET_FLUSHING (pad); + PAD_STATE_SET (pad, STATE_FLUSHING); GST_PAD_ACTIVATE_MODE (pad) = new_mode; /* unlock blocked pads so element can resume and stop */ GST_PAD_BLOCK_BROADCAST (pad); @@ -892,6 +963,7 @@ failure: GST_CAT_INFO_OBJECT (GST_CAT_PADS, pad, "failed to %s in pull mode", active ? "activate" : "deactivate"); GST_PAD_SET_FLUSHING (pad); + PAD_STATE_SET (pad, STATE_FLUSHING); GST_PAD_ACTIVATE_MODE (pad) = old; GST_OBJECT_UNLOCK (pad); return FALSE; @@ -997,6 +1069,7 @@ failure: GST_CAT_INFO_OBJECT (GST_CAT_PADS, pad, "failed to %s in push mode", active ? "activate" : "deactivate"); GST_PAD_SET_FLUSHING (pad); + PAD_STATE_SET (pad, STATE_FLUSHING); GST_PAD_ACTIVATE_MODE (pad) = old; GST_OBJECT_UNLOCK (pad); return FALSE; @@ -1087,6 +1160,7 @@ gst_pad_add_probe (GstPad * pad, GstProbeType mask, /* add the probe */ g_hook_prepend (&pad->probes, hook); pad->num_probes++; + PAD_STATE_SET (pad, STATE_PROBES); /* get the id of the hook, we return this and it can be used to remove the * probe later */ @@ -1145,6 +1219,8 @@ cleanup_hook (GstPad * pad, GHook * hook) } g_hook_destroy_link (&pad->probes, hook); pad->num_probes--; + if (pad->num_probes == 0) + PAD_STATE_UNSET (pad, STATE_PROBES); } /** @@ -1679,6 +1755,10 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad) GST_PAD_UNLINKFUNC (sinkpad) (sinkpad); } + /* FIXME, do something more clever */ + PAD_STATE_SET (srcpad, STATE_UNLINKED); + PAD_STATE_SET (sinkpad, STATE_UNLINKED); + /* first clear peers */ GST_PAD_PEER (srcpad) = NULL; GST_PAD_PEER (sinkpad) = NULL; @@ -2030,6 +2110,10 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) if (G_UNLIKELY (result != GST_PAD_LINK_OK)) goto done; + /* FIXME, do something more clever */ + PAD_STATE_SET (srcpad, STATE_UNLINKED); + PAD_STATE_SET (sinkpad, STATE_UNLINKED); + /* must set peers before calling the link function */ GST_PAD_PEER (srcpad) = sinkpad; GST_PAD_PEER (sinkpad) = srcpad; @@ -2713,8 +2797,7 @@ not_accepted: } /* function to send all pending events on the sinkpad to the event - * function and collect the results. This function should be called with - * the object lock. The object lock might be released by this function. + * function and collect the results. */ static GstFlowReturn gst_pad_update_events (GstPad * pad) @@ -2738,16 +2821,9 @@ gst_pad_update_events (GstPad * pad) continue; gst_event_ref (event); - GST_OBJECT_UNLOCK (pad); res = do_event_function (pad, event, eventfunc); - GST_OBJECT_LOCK (pad); - /* things could have changed while we release the lock, check if we still - * are handling the same event, if we don't something changed and we have - * to try again. FIXME. we need a cookie here. FIXME, we also want to remove - * that lock eventually and then do the retry elsewhere. */ - if (res) { /* make the event active */ gst_event_take (&ev->event, event); @@ -3372,13 +3448,11 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) } } -#define PROBE(pad,mask,data,label) \ - G_STMT_START { \ - if (G_UNLIKELY (pad->num_probes)) { \ - ret = do_probe_callbacks (pad, mask, data); \ - if (G_UNLIKELY (ret != GST_FLOW_OK)) \ - goto label; \ - } \ +#define PROBE(pad,mask,data,label) \ + G_STMT_START { \ + ret = do_probe_callbacks (pad, mask, data); \ + if (G_UNLIKELY (ret != GST_FLOW_OK)) \ + goto label; \ } G_STMT_END static GstFlowReturn @@ -3538,8 +3612,10 @@ gst_pad_set_offset (GstPad * pad, gint64 offset) GST_OBJECT_LOCK (peer); /* take the current segment event, adjust it and then place * it on the sinkpad. events on the srcpad are always active. */ - if (replace_event (pad, peer, idx)) + if (replace_event (pad, peer, idx)) { GST_OBJECT_FLAG_SET (peer, GST_PAD_NEED_EVENTS); + PAD_STATE_SET (peer, STATE_NEEDS_EVENTS); + } GST_OBJECT_UNLOCK (peer); @@ -3559,28 +3635,38 @@ static inline GstFlowReturn gst_pad_chain_data_unchecked (GstPad * pad, GstProbeType type, void *data) { GstFlowReturn ret; - gboolean needs_events; + PadState state, updated; GST_PAD_STREAM_LOCK (pad); - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) - goto flushing; - - needs_events = GST_PAD_NEEDS_EVENTS (pad); - if (G_UNLIKELY (needs_events)) { - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS); +again: + state = PAD_GET_STATE (pad); + + /* no flags, we can just start processing */ + if (G_UNLIKELY (STATE_HAS_FLAGS (state))) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING))) + goto flushing; + + /* try to clear the need events flag */ + if (G_UNLIKELY (STATE_IS_SET (state, STATE_NEEDS_EVENTS))) { + updated = STATE_UNSET (state, STATE_NEEDS_EVENTS); + if (!PAD_UPDATE_STATE (pad, state, updated)) + goto again; + + GST_DEBUG_OBJECT (pad, "need to update all events"); + ret = gst_pad_update_events (pad); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto events_error; + } - GST_DEBUG_OBJECT (pad, "need to update all events"); - ret = gst_pad_update_events (pad); - if (G_UNLIKELY (ret != GST_FLOW_OK)) - goto events_error; + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing probes"); + PROBE (pad, GST_PROBE_TYPE_PUSH | type, data, probe_stopped); + GST_OBJECT_UNLOCK (pad); + } } - PROBE (pad, GST_PROBE_TYPE_PUSH | type, data, probe_stopped); - - GST_OBJECT_UNLOCK (pad); - /* NOTE: we read the chainfunc unlocked. * we cannot hold the lock for the pad so we might send * the data to the wrong function. This is not really a @@ -3627,14 +3713,12 @@ flushing: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "chaining, but pad was flushing"); - GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); return GST_FLOW_WRONG_STATE; } events_error: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted"); - GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); return ret; } @@ -3769,40 +3853,78 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list) static GstFlowReturn gst_pad_push_data (GstPad * pad, GstProbeType type, void *data) { - GstPad *peer; GstFlowReturn ret; - - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) - goto flushing; + PadState state, updated; + gboolean flags; type |= GST_PROBE_TYPE_PUSH; - /* do block probes */ - PROBE (pad, type | GST_PROBE_TYPE_BLOCK, data, probe_stopped); +again: + state = PAD_GET_STATE (pad); - /* do post-blocking probes */ - PROBE (pad, type, data, probe_stopped); + /* increment the busy counter */ + updated = STATE_BUSY_INC (state); - if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) - goto not_linked; + flags = STATE_HAS_FLAGS (state); + /* check flags first */ + if (G_UNLIKELY (flags)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING))) + goto flushing; - /* take ref to peer pad before releasing the lock */ - gst_object_ref (peer); - pad->priv->using++; - GST_OBJECT_UNLOCK (pad); + /* clear unlinked flag */ + updated = STATE_UNSET (updated, STATE_UNLINKED); + } - ret = gst_pad_chain_data_unchecked (peer, type, data); + /* update state now */ + if (G_UNLIKELY (!PAD_UPDATE_STATE (pad, state, updated))) + goto again; - gst_object_unref (peer); + if (G_UNLIKELY (flags)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing pad probes"); + /* do block probes */ + PROBE (pad, type | GST_PROBE_TYPE_BLOCK, data, probe_stopped); - GST_OBJECT_LOCK (pad); - pad->priv->using--; - if (pad->priv->using == 0) { - /* pad is not active anymore, trigger idle callbacks */ - PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL, probe_stopped); + /* do post-blocking probes */ + PROBE (pad, type, data, probe_stopped); + GST_OBJECT_UNLOCK (pad); + } + + if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) { + GstPad *peer; + + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "getting new peer pad"); + if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) + goto not_linked; + + /* take ref to peer pad before releasing the lock */ + gst_object_replace ((GstObject **) & pad->priv->peer, (GstObject *) peer); + GST_OBJECT_UNLOCK (pad); + } + } + + ret = gst_pad_chain_data_unchecked (pad->priv->peer, type, data); + + state = PAD_BUSY_DEC (pad); + + if (G_UNLIKELY (STATE_HAS_FLAGS (state) && STATE_BUSY_COUNT (state) == 1)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing idle probes"); + /* pad is not active anymore, trigger idle callbacks */ + PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL, + probe_stopped); + GST_OBJECT_UNLOCK (pad); + } + if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "clearing unlinked pad"); + gst_object_replace ((GstObject **) & pad->priv->peer, NULL); + GST_OBJECT_UNLOCK (pad); + } } - GST_OBJECT_UNLOCK (pad); return ret; @@ -3812,7 +3934,6 @@ flushing: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pushing, but pad was flushing"); - GST_OBJECT_UNLOCK (pad); gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_WRONG_STATE; } @@ -3836,6 +3957,7 @@ not_linked: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pushing, but it was not linked"); + gst_object_replace ((GstObject **) & pad->priv->peer, NULL); GST_OBJECT_UNLOCK (pad); gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_NOT_LINKED; @@ -3916,13 +4038,15 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size, { GstFlowReturn ret; GstPadGetRangeFunction getrangefunc; + PadState state; GST_PAD_STREAM_LOCK (pad); - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + state = PAD_GET_STATE (pad); + + /* no flags, we can just start processing */ + if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING))) goto flushing; - GST_OBJECT_UNLOCK (pad); if (G_UNLIKELY ((getrangefunc = GST_PAD_GETRANGEFUNC (pad)) == NULL)) goto no_function; @@ -3934,14 +4058,17 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size, ret = getrangefunc (pad, offset, size, buffer); + /* can only fire the signal if we have a valid buffer */ if (G_UNLIKELY (ret != GST_FLOW_OK)) goto get_range_failed; - /* can only fire the signal if we have a valid buffer */ - GST_OBJECT_LOCK (pad); - PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, *buffer, - probe_stopped); - GST_OBJECT_UNLOCK (pad); + state = PAD_GET_STATE (pad); + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, *buffer, + probe_stopped); + GST_OBJECT_UNLOCK (pad); + } GST_PAD_STREAM_UNLOCK (pad); @@ -3952,7 +4079,6 @@ flushing: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "getrange, but pad was flushing"); - GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); return GST_FLOW_WRONG_STATE; } @@ -4052,56 +4178,101 @@ GstFlowReturn gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, GstBuffer ** buffer) { - GstPad *peer; GstFlowReturn ret; - gboolean needs_events; + PadState state, updated; + gboolean flags; g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR); g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) - goto flushing; +again: + state = PAD_GET_STATE (pad); + /* increment the busy counter */ + updated = STATE_BUSY_INC (state); + + flags = STATE_HAS_FLAGS (state); + /* check flags first */ + if (G_UNLIKELY (flags)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_FLUSHING))) + goto flushing; + + /* clear unlinked flag */ + updated = STATE_UNSET (updated, STATE_UNLINKED); + } - PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, NULL, - pre_probe_stopped); + /* update state now */ + if (G_UNLIKELY (!PAD_UPDATE_STATE (pad, state, updated))) + goto again; - if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) - goto not_linked; + if (G_UNLIKELY (flags)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing pad probes"); + /* do block probes */ + PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, NULL, + pre_probe_stopped); + GST_OBJECT_UNLOCK (pad); + } - gst_object_ref (peer); - pad->priv->using++; - GST_OBJECT_UNLOCK (pad); + if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) { + GstPad *peer; - ret = gst_pad_get_range_unchecked (peer, offset, size, buffer); + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "getting new peer pad"); + if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) + goto not_linked; - gst_object_unref (peer); + /* take ref to peer pad before releasing the lock */ + gst_object_replace ((GstObject **) & pad->priv->peer, (GstObject *) peer); + GST_OBJECT_UNLOCK (pad); + } + } - GST_OBJECT_LOCK (pad); - pad->priv->using--; - if (pad->priv->using == 0) { - /* pad is not active anymore, trigger idle callbacks */ - PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_IDLE, NULL, - post_probe_stopped); + ret = gst_pad_get_range_unchecked (pad->priv->peer, offset, size, buffer); + + state = PAD_BUSY_DEC (pad); + + flags = STATE_HAS_FLAGS (state); + if (G_UNLIKELY (flags && STATE_BUSY_COUNT (state) == 1)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing idle probes"); + /* pad is not active anymore, trigger idle callbacks */ + PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_IDLE, NULL, + post_probe_stopped); + GST_OBJECT_UNLOCK (pad); + } + if (G_UNLIKELY (STATE_IS_SET (state, STATE_UNLINKED))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "clearing unlinked pad"); + gst_object_replace ((GstObject **) & pad->priv->peer, NULL); + GST_OBJECT_UNLOCK (pad); + } } if (G_UNLIKELY (ret != GST_FLOW_OK)) goto pull_range_failed; - PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, buffer, - post_probe_stopped); + if (G_UNLIKELY (flags)) { + if (G_UNLIKELY (STATE_IS_SET (state, STATE_PROBES))) { + GST_OBJECT_LOCK (pad); + GST_DEBUG_OBJECT (pad, "doing data probes"); + PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, buffer, + post_probe_stopped); + GST_OBJECT_UNLOCK (pad); + } - needs_events = GST_PAD_NEEDS_EVENTS (pad); - if (G_UNLIKELY (needs_events)) { - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS); + /* try to clear the need events flag */ + if (G_UNLIKELY (STATE_IS_SET (state, STATE_NEEDS_EVENTS))) { + PAD_STATE_UNSET (pad, STATE_NEEDS_EVENTS); - GST_DEBUG_OBJECT (pad, "we need to update the events"); - ret = gst_pad_update_events (pad); - if (G_UNLIKELY (ret != GST_FLOW_OK)) - goto events_error; + GST_DEBUG_OBJECT (pad, "need to update all events"); + ret = gst_pad_update_events (pad); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto events_error; + } } - GST_OBJECT_UNLOCK (pad); return ret; @@ -4110,7 +4281,6 @@ flushing: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pullrange, but pad was flushing"); - GST_OBJECT_UNLOCK (pad); return GST_FLOW_WRONG_STATE; } pre_probe_stopped: @@ -4124,13 +4294,13 @@ not_linked: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pulling range, but it was not linked"); + gst_object_replace ((GstObject **) & pad->priv->peer, NULL); GST_OBJECT_UNLOCK (pad); return GST_FLOW_NOT_LINKED; } pull_range_failed: { *buffer = NULL; - GST_OBJECT_UNLOCK (pad); GST_CAT_LEVEL_LOG (GST_CAT_SCHEDULING, (ret >= GST_FLOW_UNEXPECTED) ? GST_LEVEL_INFO : GST_LEVEL_WARNING, pad, "pullrange failed, flow: %s", gst_flow_get_name (ret)); @@ -4148,7 +4318,6 @@ post_probe_stopped: } events_error: { - GST_OBJECT_UNLOCK (pad); gst_buffer_unref (*buffer); *buffer = NULL; GST_CAT_WARNING_OBJECT (GST_CAT_SCHEDULING, pad, @@ -4197,6 +4366,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: GST_PAD_SET_FLUSHING (pad); + PAD_STATE_SET (pad, STATE_FLUSHING); if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) { /* flush start will have set the FLUSHING flag and will then @@ -4210,6 +4380,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) break; case GST_EVENT_FLUSH_STOP: GST_PAD_UNSET_FLUSHING (pad); + PAD_STATE_UNSET (pad, STATE_FLUSHING); if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) { GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop"); goto flushed; @@ -4407,12 +4578,14 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) goto flushing; GST_PAD_SET_FLUSHING (pad); + PAD_STATE_SET (pad, STATE_FLUSHING); GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag"); needs_events = FALSE; break; case GST_EVENT_FLUSH_STOP: if (G_LIKELY (GST_PAD_ACTIVATE_MODE (pad) != GST_ACTIVATE_NONE)) { GST_PAD_UNSET_FLUSHING (pad); + PAD_STATE_UNSET (pad, STATE_FLUSHING); GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "cleared flush flag"); } GST_OBJECT_UNLOCK (pad); @@ -4475,6 +4648,7 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) /* set the flag so that we update the events next time. We would * usually update below but we might be flushing too. */ GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_EVENTS); + PAD_STATE_SET (pad, STATE_NEEDS_EVENTS); needs_events = TRUE; } } @@ -4492,12 +4666,13 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) GstFlowReturn ret; GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_EVENTS); + PAD_STATE_UNSET (pad, STATE_NEEDS_EVENTS); + GST_OBJECT_UNLOCK (pad); GST_DEBUG_OBJECT (pad, "need to update all events"); ret = gst_pad_update_events (pad); if (ret != GST_FLOW_OK) goto update_failed; - GST_OBJECT_UNLOCK (pad); gst_event_unref (event); @@ -4567,7 +4742,6 @@ probe_stopped: } update_failed: { - GST_OBJECT_UNLOCK (pad); if (need_unlock) GST_PAD_STREAM_UNLOCK (pad); GST_CAT_INFO_OBJECT (GST_CAT_EVENT, pad, "Update events failed"); |