summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2015-03-19 11:39:38 +0100
committerSebastian Dröge <sebastian@centricular.com>2015-03-19 11:54:37 +0100
commit57ff27f8c8cd3c306fe32efac338e4f11e1b01e1 (patch)
treee15a9f4589e7352656f9df28279e34d7cf1ad8e8
parent1c27002ebd5880a1b8f09af72879424c09f29308 (diff)
rtprtxqueue: Implement support for buffer lists
-rw-r--r--gst/rtpmanager/gstrtprtxqueue.c56
1 files changed, 51 insertions, 5 deletions
diff --git a/gst/rtpmanager/gstrtprtxqueue.c b/gst/rtpmanager/gstrtprtxqueue.c
index 209aa766c..c5b88dd35 100644
--- a/gst/rtpmanager/gstrtprtxqueue.c
+++ b/gst/rtpmanager/gstrtprtxqueue.c
@@ -64,6 +64,8 @@ static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
element, GstStateChange transition);
@@ -159,6 +161,8 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
gst_pad_set_chain_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
+ gst_pad_set_chain_list_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->queue = g_queue_new ();
@@ -246,6 +250,16 @@ do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
gst_pad_push (rtx->srcpad, buffer);
}
+/* Must be called with rtx->lock */
+static void
+shrink_queue (GstRTPRtxQueue * rtx)
+{
+ if (rtx->max_size_packets) {
+ while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
+ gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+ }
+}
+
static GstFlowReturn
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
@@ -257,11 +271,7 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
g_mutex_lock (&rtx->lock);
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
-
- if (rtx->max_size_packets) {
- while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
- gst_buffer_unref (g_queue_pop_tail (rtx->queue));
- }
+ shrink_queue (rtx);
pending = rtx->pending;
rtx->pending = NULL;
@@ -275,6 +285,42 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
return ret;
}
+static gboolean
+push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+ GQueue *queue = user_data;
+
+ g_queue_push_head (queue, gst_buffer_ref (*buffer));
+
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRTPRtxQueue *rtx;
+ GstFlowReturn ret;
+ GList *pending;
+
+ rtx = GST_RTP_RTX_QUEUE (parent);
+
+ g_mutex_lock (&rtx->lock);
+ gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
+ shrink_queue (rtx);
+
+ pending = rtx->pending;
+ rtx->pending = NULL;
+ g_mutex_unlock (&rtx->lock);
+
+ g_list_foreach (pending, (GFunc) do_push, rtx);
+ g_list_free (pending);
+
+ ret = gst_pad_push_list (rtx->srcpad, list);
+
+ return ret;
+}
+
static void
gst_rtp_rtx_queue_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)