summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2012-04-18 15:17:53 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2012-04-18 15:17:53 +0200
commitc366425ef05dffc70922564bada374cd267253e0 (patch)
treef8800f1d09f503f156d7f5e4e11530842839d475
parent6fae13702717a72f609963f5879c8ea12ab4aed4 (diff)
pad: first attempt at pad cachepad-cache
We keep a state variable with a refcount and a valid flag on each srcpad. The refcount allows us to see how many threads are busy in pad_push and allows us to control the refcount of the cached peer pad. The valid flag controls if the fast path should be taken or not. The fast path is only enabled when there are no probes and we are not flushing, TODO: invalide the cache for sending sticky events convert the other functions using the old ->using refcount.
-rw-r--r--gst/gstpad.c196
-rw-r--r--tests/check/gst/gstpad.c4
2 files changed, 178 insertions, 22 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c
index 046420904..0ed6bdee9 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -140,8 +140,42 @@ struct _GstPadPrivate
gint using;
guint probe_list_cookie;
guint probe_cookie;
+
+ /* cache */
+ gint state;
+ GstPad *peer;
};
+/* state masks
+ * 3 2 1 0
+ * 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | using | M |V|
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ *
+ * V: valid flag
+ * M: pad mode
+ * I: idle probe installed
+ */
+
+#define PAD_VALID_FLAG (1 << 0)
+#define PAD_USING_VAL (1 << 4)
+#define PAD_STATE_MASK (PAD_USING_VAL - 1)
+#define PAD_STATE_VALID_MASK (PAD_STATE_MASK - PAD_VALID_FLAG)
+#define PAD_STATE_VALID_MODE(s,mode) (((s) & PAD_STATE_MASK) == (PAD_VALID_FLAG | (mode << 2)))
+#define PAD_STATE_VALID(s) (((s) & PAD_VALID_FLAG) == PAD_VALID_FLAG)
+#define PAD_STATE_LEAVE_IDLE(s) (((s) & ~PAD_STATE_MASK) == PAD_USING_VAL)
+#define PAD_STATE_WAS_VALID_IDLE(s) (((s) & ~PAD_STATE_VALID_MASK) == PAD_VALID_FLAG)
+#define PAD_STATE_WAS_IDLE(s) (((s) & ~PAD_STATE_MASK) == 0)
+
+#define PAD_CACHE_INIT(p) g_atomic_int_set (&(p)->priv->state, 0)
+#define PAD_CACHE_SET_MODE(p,m) g_atomic_int_or (&(p)->priv->state, ((m) << 2))
+#define PAD_CACHE_ENTER(p) g_atomic_int_add (&(p)->priv->state, PAD_USING_VAL)
+#define PAD_CACHE_LEAVE(p) g_atomic_int_add (&(p)->priv->state, -PAD_USING_VAL)
+#define PAD_CACHE_PEER(p) ((p)->priv->peer)
+#define PAD_CACHE_UNSET_VALID(p) g_atomic_int_and (&(p)->priv->state, ~PAD_VALID_FLAG)
+#define PAD_CACHE_SET_VALID(p) g_atomic_int_or (&(p)->priv->state, PAD_VALID_FLAG)
+
typedef struct
{
GHook hook;
@@ -347,6 +381,7 @@ gst_pad_init (GstPad * pad)
GST_PAD_ITERINTLINKFUNC (pad) = gst_pad_iterate_internal_links_default;
GST_PAD_CHAINLISTFUNC (pad) = gst_pad_chain_list_default;
+ PAD_CACHE_INIT (pad);
GST_PAD_SET_FLUSHING (pad);
g_rec_mutex_init (&pad->stream_rec_lock);
@@ -358,6 +393,30 @@ gst_pad_init (GstPad * pad)
pad->priv->events = g_array_sized_new (FALSE, TRUE, sizeof (PadEvent), 16);
}
+static gint
+invalidate_cache (GstPad * pad)
+{
+ GstPad *peer;
+ gint state;
+
+ /* we hold the pad lock here so we can get the peer and it stays
+ * alive during this call */
+ if (GST_PAD_IS_SINK (pad)) {
+ if (!(pad = GST_PAD_PEER (pad)))
+ return 0;
+ }
+ GST_LOG_OBJECT (pad, "Invalidating pad cache");
+
+ peer = PAD_CACHE_PEER (pad);
+ state = PAD_CACHE_UNSET_VALID (pad);
+ PAD_CACHE_PEER (pad) = NULL;
+
+ if (PAD_STATE_WAS_VALID_IDLE (state) && peer)
+ gst_object_unref (peer);
+
+ return state;
+}
+
/* called when setting the pad inactive. It removes all sticky events from
* the pad */
static void
@@ -644,6 +703,9 @@ gst_pad_finalize (GObject * object)
if (pad->iterintlinknotify)
pad->iterintlinknotify (pad->iterintlinkdata);
+ if (PAD_CACHE_PEER (pad))
+ gst_object_unref (PAD_CACHE_PEER (pad));
+
g_rec_mutex_clear (&pad->stream_rec_lock);
g_cond_clear (&pad->block_cond);
g_array_free (pad->priv->events, TRUE);
@@ -815,8 +877,10 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
case GST_PAD_MODE_NONE:
GST_OBJECT_LOCK (pad);
GST_DEBUG_OBJECT (pad, "setting PAD_MODE NONE, set flushing");
+ invalidate_cache (pad);
GST_PAD_SET_FLUSHING (pad);
GST_PAD_MODE (pad) = new_mode;
+ PAD_CACHE_SET_MODE (pad, new_mode);
/* unlock blocked pads so element can resume and stop */
GST_PAD_BLOCK_BROADCAST (pad);
GST_OBJECT_UNLOCK (pad);
@@ -827,6 +891,7 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
GST_DEBUG_OBJECT (pad, "setting PAD_MODE %d, unset flushing", new_mode);
GST_PAD_UNSET_FLUSHING (pad);
GST_PAD_MODE (pad) = new_mode;
+ PAD_CACHE_SET_MODE (pad, new_mode);
if (GST_PAD_IS_SINK (pad)) {
GstPad *peer;
/* make sure the peer src pad sends us all events */
@@ -1092,8 +1157,10 @@ failure:
GST_OBJECT_LOCK (pad);
GST_CAT_INFO_OBJECT (GST_CAT_PADS, pad, "failed to %s in mode %d",
active ? "activate" : "deactivate", mode);
+ invalidate_cache (pad);
GST_PAD_SET_FLUSHING (pad);
GST_PAD_MODE (pad) = old;
+ PAD_CACHE_SET_MODE (pad, old);
GST_OBJECT_UNLOCK (pad);
goto exit;
}
@@ -1147,6 +1214,7 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
{
GHook *hook;
gulong res;
+ gint state;
g_return_val_if_fail (GST_IS_PAD (pad), 0);
g_return_val_if_fail (mask != 0, 0);
@@ -1176,7 +1244,8 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
/* add the probe */
g_hook_prepend (&pad->probes, hook);
pad->num_probes++;
- /* incremenent cookie so that the new hook get's called */
+ state = invalidate_cache (pad);
+ /* incremenent cookie so that the new hook gets called */
pad->priv->probe_list_cookie++;
/* get the id of the hook, we return this and it can be used to remove the
@@ -1195,7 +1264,7 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
/* call the callback if we need to be called for idle callbacks */
if ((mask & GST_PAD_PROBE_TYPE_IDLE) && (callback != NULL)) {
- if (pad->priv->using > 0) {
+ if (!PAD_STATE_WAS_IDLE (state)) {
/* the pad is in use, we can't signal the idle callback yet. Since we set the
* flag above, the last thread to leave the push will do the callback. New
* threads going into the push will block. */
@@ -1710,6 +1779,8 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
GST_PAD_UNLINKFUNC (sinkpad) (sinkpad);
}
+ invalidate_cache (srcpad);
+
/* first clear peers */
GST_PAD_PEER (srcpad) = NULL;
GST_PAD_PEER (sinkpad) = NULL;
@@ -2991,9 +3062,9 @@ no_match:
} \
} G_STMT_END
-#define PROBE_PUSH(pad,mask,data,label) \
+#define PROBE_PUSH(pad,mask,data,label) \
PROBE_FULL(pad, mask, data, -1, -1, label);
-#define PROBE_PULL(pad,mask,data,offs,size,label) \
+#define PROBE_PULL(pad,mask,data,offs,size,label) \
PROBE_FULL(pad, mask, data, offs, size, label);
static GstFlowReturn
@@ -3462,7 +3533,8 @@ probe_stopped:
* checking for that little extra speed.
*/
static inline GstFlowReturn
-gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
+gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data,
+ gboolean * cache)
{
GstFlowReturn ret;
GstObject *parent;
@@ -3480,6 +3552,8 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
PROBE_PUSH (pad, type, data, probe_stopped);
+ *cache &= pad->num_probes == 0;
+
parent = GST_OBJECT_PARENT (pad);
GST_OBJECT_UNLOCK (pad);
@@ -3532,6 +3606,7 @@ flushing:
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+ *cache = FALSE;
return GST_FLOW_FLUSHING;
}
wrong_mode:
@@ -3541,10 +3616,12 @@ wrong_mode:
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+ *cache = FALSE;
return GST_FLOW_ERROR;
}
probe_stopped:
{
+ *cache &= pad->num_probes == 0;
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
@@ -3566,6 +3643,7 @@ no_function:
g_critical ("chain on pad %s:%s but it has no chainfunction",
GST_DEBUG_PAD_NAME (pad));
GST_PAD_STREAM_UNLOCK (pad);
+ *cache = FALSE;
return GST_FLOW_NOT_SUPPORTED;
}
}
@@ -3599,12 +3677,14 @@ no_function:
GstFlowReturn
gst_pad_chain (GstPad * pad, GstBuffer * buffer)
{
+ gboolean cache = FALSE;
+
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
return gst_pad_chain_data_unchecked (pad,
- GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer);
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer, &cache);
}
static GstFlowReturn
@@ -3614,6 +3694,7 @@ gst_pad_chain_list_default (GstPad * pad, GstObject * parent,
guint i, len;
GstBuffer *buffer;
GstFlowReturn ret;
+ gboolean cache = FALSE;
GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
@@ -3625,7 +3706,7 @@ gst_pad_chain_list_default (GstPad * pad, GstObject * parent,
ret =
gst_pad_chain_data_unchecked (pad,
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH,
- gst_buffer_ref (buffer));
+ gst_buffer_ref (buffer), &cache);
if (ret != GST_FLOW_OK)
break;
}
@@ -3664,12 +3745,14 @@ gst_pad_chain_list_default (GstPad * pad, GstObject * parent,
GstFlowReturn
gst_pad_chain_list (GstPad * pad, GstBufferList * list)
{
+ gboolean cache = FALSE;
+
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER_LIST (list), GST_FLOW_ERROR);
return gst_pad_chain_data_unchecked (pad,
- GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH, list);
+ GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH, list, &cache);
}
static GstFlowReturn
@@ -3677,6 +3760,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
{
GstPad *peer;
GstFlowReturn ret;
+ gboolean cache;
GST_OBJECT_LOCK (pad);
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
@@ -3697,23 +3781,27 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
+ cache = pad->num_probes == 0;
+
/* take ref to peer pad before releasing the lock */
gst_object_ref (peer);
- pad->priv->using++;
GST_OBJECT_UNLOCK (pad);
- ret = gst_pad_chain_data_unchecked (peer, type, data);
+ ret = gst_pad_chain_data_unchecked (peer, type, data, &cache);
- gst_object_unref (peer);
+ if (cache) {
+ GstPad *oldpeer;
- GST_OBJECT_LOCK (pad);
- pad->priv->using--;
- if (pad->priv->using == 0) {
- /* pad is not active anymore, trigger idle callbacks */
- PROBE_NO_DATA (pad, GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_IDLE,
- probe_stopped, ret);
+ GST_OBJECT_LOCK (pad);
+ GST_TRACE_OBJECT (pad, "caching peer %p for fast path", peer);
+ if ((oldpeer = PAD_CACHE_PEER (pad)))
+ gst_object_unref (oldpeer);
+ PAD_CACHE_PEER (pad) = peer;
+ PAD_CACHE_SET_VALID (pad);
+ GST_OBJECT_UNLOCK (pad);
+ } else {
+ gst_object_unref (peer);
}
- GST_OBJECT_UNLOCK (pad);
return ret;
@@ -3794,12 +3882,78 @@ not_linked:
GstFlowReturn
gst_pad_push (GstPad * pad, GstBuffer * buffer)
{
+ GstFlowReturn ret;
+ gint state;
+ GstPad *peer;
+
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
- return gst_pad_push_data (pad,
- GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer);
+ state = PAD_CACHE_ENTER (pad);
+ if (G_UNLIKELY (!PAD_STATE_VALID_MODE (state, GST_PAD_MODE_PUSH)))
+ goto slow_enter;
+
+ peer = PAD_CACHE_PEER (pad);
+
+ GST_PAD_STREAM_LOCK (peer);
+
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "calling chainfunction &%s with buffer %" GST_PTR_FORMAT,
+ GST_DEBUG_FUNCPTR_NAME (GST_PAD_CHAINFUNC (peer)), buffer);
+
+ ret = GST_PAD_CHAINFUNC (peer) (peer, GST_OBJECT_PARENT (peer), buffer);
+
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "called chainfunction &%s with buffer %p, returned %s",
+ GST_DEBUG_FUNCPTR_NAME (GST_PAD_CHAINFUNC (peer)), buffer,
+ gst_flow_get_name (ret));
+
+ GST_PAD_STREAM_UNLOCK (peer);
+
+done:
+ state = PAD_CACHE_LEAVE (pad);
+ if (G_UNLIKELY (!PAD_STATE_VALID (state)))
+ goto slow_leave;
+
+ return ret;
+
+slow_enter:
+ {
+ GST_TRACE_OBJECT (pad, "taking slow enter");
+
+ ret = gst_pad_push_data (pad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer);
+
+ goto done;
+ }
+slow_leave:
+ {
+ GST_TRACE_OBJECT (pad, "taking slow leave");
+ if (PAD_STATE_LEAVE_IDLE (state)) {
+ GST_OBJECT_LOCK (pad);
+ /* pad is not active anymore, trigger idle callbacks */
+ PROBE_NO_DATA (pad, GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_IDLE,
+ probe_stopped, ret);
+ GST_OBJECT_UNLOCK (pad);
+ }
+ return ret;
+ }
+probe_stopped:
+ {
+ GST_OBJECT_UNLOCK (pad);
+
+ switch (ret) {
+ case GST_FLOW_CUSTOM_SUCCESS:
+ GST_DEBUG_OBJECT (pad, "dropped buffer");
+ ret = GST_FLOW_OK;
+ break;
+ default:
+ GST_DEBUG_OBJECT (pad, "an error occured %s", gst_flow_get_name (ret));
+ break;
+ }
+ return ret;
+ }
}
/**
@@ -4278,6 +4432,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
event_type = GST_EVENT_TYPE (event);
switch (event_type) {
case GST_EVENT_FLUSH_START:
+ invalidate_cache (pad);
GST_PAD_SET_FLUSHING (pad);
GST_PAD_BLOCK_BROADCAST (pad);
@@ -4543,6 +4698,7 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
+ invalidate_cache (pad);
GST_PAD_SET_FLUSHING (pad);
GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag");
break;
diff --git a/tests/check/gst/gstpad.c b/tests/check/gst/gstpad.c
index cb5ef3637..b7020d1c4 100644
--- a/tests/check/gst/gstpad.c
+++ b/tests/check/gst/gstpad.c
@@ -290,7 +290,7 @@ GST_START_TEST (test_sticky_caps_unlinked)
gst_caps_replace (&event_caps, NULL);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
- ASSERT_OBJECT_REFCOUNT (sink, "sink", 1);
+ ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
gst_object_unref (src);
gst_object_unref (sink);
}
@@ -416,7 +416,7 @@ GST_START_TEST (test_sticky_caps_flushing)
gst_caps_replace (&event_caps, NULL);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
- ASSERT_OBJECT_REFCOUNT (sink, "sink", 1);
+ ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
gst_object_unref (src);
gst_object_unref (sink);
}