diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2015-03-19 11:39:38 +0100 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2015-03-19 11:54:37 +0100 |
commit | 57ff27f8c8cd3c306fe32efac338e4f11e1b01e1 (patch) | |
tree | e15a9f4589e7352656f9df28279e34d7cf1ad8e8 | |
parent | 1c27002ebd5880a1b8f09af72879424c09f29308 (diff) |
rtprtxqueue: Implement support for buffer lists
-rw-r--r-- | gst/rtpmanager/gstrtprtxqueue.c | 56 |
1 files changed, 51 insertions, 5 deletions
diff --git a/gst/rtpmanager/gstrtprtxqueue.c b/gst/rtpmanager/gstrtprtxqueue.c index 209aa766c..c5b88dd35 100644 --- a/gst/rtpmanager/gstrtprtxqueue.c +++ b/gst/rtpmanager/gstrtprtxqueue.c @@ -64,6 +64,8 @@ static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event); static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad, + GstObject * parent, GstBufferList * list); static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition); @@ -159,6 +161,8 @@ gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx) GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); gst_pad_set_chain_function (rtx->sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain)); + gst_pad_set_chain_list_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list)); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); rtx->queue = g_queue_new (); @@ -246,6 +250,16 @@ do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx) gst_pad_push (rtx->srcpad, buffer); } +/* Must be called with rtx->lock */ +static void +shrink_queue (GstRTPRtxQueue * rtx) +{ + if (rtx->max_size_packets) { + while (g_queue_get_length (rtx->queue) > rtx->max_size_packets) + gst_buffer_unref (g_queue_pop_tail (rtx->queue)); + } +} + static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { @@ -257,11 +271,7 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) g_mutex_lock (&rtx->lock); g_queue_push_head (rtx->queue, gst_buffer_ref (buffer)); - - if (rtx->max_size_packets) { - while (g_queue_get_length (rtx->queue) > rtx->max_size_packets) - gst_buffer_unref (g_queue_pop_tail (rtx->queue)); - } + shrink_queue (rtx); pending = rtx->pending; rtx->pending = NULL; @@ -275,6 +285,42 @@ gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) return ret; } +static gboolean +push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data) +{ + GQueue *queue = user_data; + + g_queue_push_head (queue, gst_buffer_ref (*buffer)); + + return TRUE; +} + +static GstFlowReturn +gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstRTPRtxQueue *rtx; + GstFlowReturn ret; + GList *pending; + + rtx = GST_RTP_RTX_QUEUE (parent); + + g_mutex_lock (&rtx->lock); + gst_buffer_list_foreach (list, push_to_queue, rtx->queue); + shrink_queue (rtx); + + pending = rtx->pending; + rtx->pending = NULL; + g_mutex_unlock (&rtx->lock); + + g_list_foreach (pending, (GFunc) do_push, rtx); + g_list_free (pending); + + ret = gst_pad_push_list (rtx->srcpad, list); + + return ret; +} + static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) |