diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2015-02-17 11:44:40 +0200 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2015-02-18 11:03:08 +0200 |
commit | 6369ba06ff2f26a2fb2d11ea5aa47f444b89f88a (patch) | |
tree | dae74c44ac3749f379741ce4594adb8a1e14aeb4 | |
parent | 927666642ef49ea2fa1f4e90efed39efda65278f (diff) |
queue: Add support for buffer lists
-rw-r--r-- | plugins/elements/gstqueue.c | 244 |
1 files changed, 212 insertions, 32 deletions
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index ebd88cd68..eb3ed955e 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -195,6 +195,8 @@ static void gst_queue_get_property (GObject * object, static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list); static GstFlowReturn gst_queue_push_one (GstQueue * queue); static void gst_queue_loop (GstPad * pad); @@ -417,6 +419,7 @@ gst_queue_class_init (GstQueueClass * klass) GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event); GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query); GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain); + GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list); } static void @@ -425,6 +428,7 @@ gst_queue_init (GstQueue * queue) queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); + gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list); gst_pad_set_activatemode_function (queue->sinkpad, gst_queue_sink_activate_mode); gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event); @@ -622,6 +626,60 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, update_time_level (queue); } +typedef struct +{ + GstClockTime timestamp; + gboolean with_duration; +} BufferListApplyTimeData; + +static gboolean +buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data) +{ + BufferListApplyTimeData *data = user_data; + + GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT + " duration %" GST_TIME_FORMAT, idx, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); + + if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf)) + data->timestamp = GST_BUFFER_TIMESTAMP (*buf); + + if (data->with_duration && GST_BUFFER_DURATION_IS_VALID (*buf)) + data->timestamp += GST_BUFFER_DURATION (*buf); + + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp)); + + return TRUE; +} + +/* take a buffer list and update segment, updating the time level of the queue */ +static void +apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list, + GstSegment * segment, gboolean with_duration, gboolean sink) +{ + BufferListApplyTimeData data; + + /* if no timestamp is set, assume it's continuous with the previous time */ + data.timestamp = segment->position; + data.with_duration = with_duration; + + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data); + + GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (data.timestamp)); + + segment->position = data.timestamp; + + if (sink) + queue->sink_tainted = TRUE; + else + queue->src_tainted = TRUE; + + /* calc diff with other end */ + update_time_level (queue); +} + static void gst_queue_locked_flush (GstQueue * queue, gboolean full) { @@ -678,6 +736,40 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) GST_QUEUE_SIGNAL_ADD (queue); } +static gboolean +buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data) +{ + guint *p_size = data; + gsize buf_size; + + buf_size = gst_buffer_get_size (*buf); + GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size); + *p_size += buf_size; + return TRUE; +} + +static inline void +gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item) +{ + GstQueueItem *qitem; + GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); + gsize bsize = 0; + + gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize); + + /* add buffer to the statistics */ + queue->cur_level.buffers += gst_buffer_list_length (buffer_list); + queue->cur_level.bytes += bsize; + apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE, TRUE); + + qitem = g_slice_new (GstQueueItem); + qitem->item = item; + qitem->is_query = FALSE; + qitem->size = bsize; + gst_queue_array_push_tail (queue->queue, qitem); + GST_QUEUE_SIGNAL_ADD (queue); +} + static inline void gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) { @@ -751,7 +843,19 @@ gst_queue_locked_dequeue (GstQueue * queue) /* if the queue is empty now, update the other side */ if (queue->cur_level.buffers == 0) queue->cur_level.time = 0; + } else if (GST_IS_BUFFER_LIST (item)) { + GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "retrieved buffer list %p from queue", buffer_list); + queue->cur_level.buffers -= gst_buffer_list_length (buffer_list); + queue->cur_level.bytes -= bufsize; + apply_buffer_list (queue, buffer_list, &queue->src_segment, TRUE, FALSE); + + /* if the queue is empty now, update the other side */ + if (queue->cur_level.buffers == 0) + queue->cur_level.time = 0; } else if (GST_IS_EVENT (item)) { GstEvent *event = GST_EVENT_CAST (item); @@ -1014,11 +1118,27 @@ gst_queue_leak_downstream (GstQueue * queue) } } +static gboolean +discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data) +{ + GstQueue *queue = user_data; + GstBuffer *subbuffer = gst_buffer_make_writable (*buffer); + + if (subbuffer) { + *buffer = subbuffer; + GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT); + } else { + GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT"); + } + + return FALSE; +} + static GstFlowReturn -gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent, + GstMiniObject * obj, gboolean is_list) { GstQueue *queue; - GstClockTime duration, timestamp; queue = GST_QUEUE_CAST (parent); @@ -1030,13 +1150,22 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (queue->unexpected) goto out_unexpected; - timestamp = GST_BUFFER_TIMESTAMP (buffer); - duration = GST_BUFFER_DURATION (buffer); + if (!is_list) { + GstClockTime duration, timestamp; + GstBuffer *buffer = GST_BUFFER_CAST (obj); - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %" - G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" - GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer), - GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration)); + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %" + G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" + GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer), + GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration)); + } else { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "received buffer list %p with %u buffers", obj, + gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj))); + } /* We make space available if we're "full" according to whatever * the user defined as "full". Note that this only applies to buffers. @@ -1091,19 +1220,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) } if (queue->tail_needs_discont) { - GstBuffer *subbuffer = gst_buffer_make_writable (buffer); + if (!is_list) { + GstBuffer *buffer = GST_BUFFER_CAST (obj); + GstBuffer *subbuffer = gst_buffer_make_writable (buffer); + + if (subbuffer) { + buffer = subbuffer; + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } else { + GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT"); + } - if (subbuffer) { - buffer = subbuffer; - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + obj = GST_MINI_OBJECT_CAST (buffer); } else { - GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT"); + GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj); + + buffer_list = gst_buffer_list_make_writable (buffer_list); + gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue); + obj = GST_MINI_OBJECT_CAST (buffer_list); } queue->tail_needs_discont = FALSE; } /* put buffer in queue now */ - gst_queue_locked_enqueue_buffer (queue, buffer); + if (is_list) + gst_queue_locked_enqueue_buffer_list (queue, obj); + else + gst_queue_locked_enqueue_buffer (queue, obj); GST_QUEUE_MUTEX_UNLOCK (queue); return GST_FLOW_OK; @@ -1113,7 +1256,7 @@ out_unref: { GST_QUEUE_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (obj); return GST_FLOW_OK; } @@ -1124,7 +1267,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (obj); return ret; } @@ -1133,7 +1276,7 @@ out_eos: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (obj); return GST_FLOW_EOS; } @@ -1142,12 +1285,27 @@ out_unexpected: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (obj); return GST_FLOW_EOS; } } +static GstFlowReturn +gst_queue_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list) +{ + return gst_queue_chain_buffer_or_list (pad, parent, + GST_MINI_OBJECT_CAST (buffer_list), TRUE); +} + +static GstFlowReturn +gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + return gst_queue_chain_buffer_or_list (pad, parent, + GST_MINI_OBJECT_CAST (buffer), FALSE); +} + /* dequeue an item from the queue an push it downstream. This functions returns * the result of the push. */ static GstFlowReturn @@ -1155,31 +1313,49 @@ gst_queue_push_one (GstQueue * queue) { GstFlowReturn result = queue->srcresult; GstMiniObject *data; + gboolean is_list; data = gst_queue_locked_dequeue (queue); if (data == NULL) goto no_item; next: - if (GST_IS_BUFFER (data)) { - GstBuffer *buffer; + is_list = GST_IS_BUFFER_LIST (data); - buffer = GST_BUFFER_CAST (data); + if (GST_IS_BUFFER (data) || is_list) { + if (!is_list) { + GstBuffer *buffer; - if (queue->head_needs_discont) { - GstBuffer *subbuffer = gst_buffer_make_writable (buffer); + buffer = GST_BUFFER_CAST (data); - if (subbuffer) { - buffer = subbuffer; - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - } else { - GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT"); + if (queue->head_needs_discont) { + GstBuffer *subbuffer = gst_buffer_make_writable (buffer); + + if (subbuffer) { + buffer = subbuffer; + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } else { + GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT"); + } + queue->head_needs_discont = FALSE; } - queue->head_needs_discont = FALSE; - } - GST_QUEUE_MUTEX_UNLOCK (queue); - result = gst_pad_push (queue->srcpad, buffer); + GST_QUEUE_MUTEX_UNLOCK (queue); + result = gst_pad_push (queue->srcpad, buffer); + } else { + GstBufferList *buffer_list; + + buffer_list = GST_BUFFER_LIST_CAST (data); + + if (queue->head_needs_discont) { + buffer_list = gst_buffer_list_make_writable (buffer_list); + gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue); + queue->head_needs_discont = FALSE; + } + + GST_QUEUE_MUTEX_UNLOCK (queue); + result = gst_pad_push_list (queue->srcpad, buffer_list); + } /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); @@ -1195,6 +1371,10 @@ next: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS buffer %p", data); gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_BUFFER_LIST (data)) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS buffer list %p", data); + gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data)); } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); |