summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2015-02-17 11:44:40 +0200
committerSebastian Dröge <sebastian@centricular.com>2015-02-18 11:03:08 +0200
commit6369ba06ff2f26a2fb2d11ea5aa47f444b89f88a (patch)
treedae74c44ac3749f379741ce4594adb8a1e14aeb4
parent927666642ef49ea2fa1f4e90efed39efda65278f (diff)
queue: Add support for buffer lists
-rw-r--r--plugins/elements/gstqueue.c244
1 files changed, 212 insertions, 32 deletions
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index ebd88cd68..eb3ed955e 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -195,6 +195,8 @@ static void gst_queue_get_property (GObject * object,
static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list);
static GstFlowReturn gst_queue_push_one (GstQueue * queue);
static void gst_queue_loop (GstPad * pad);
@@ -417,6 +419,7 @@ gst_queue_class_init (GstQueueClass * klass)
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
}
static void
@@ -425,6 +428,7 @@ gst_queue_init (GstQueue * queue)
queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+ gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
gst_pad_set_activatemode_function (queue->sinkpad,
gst_queue_sink_activate_mode);
gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
@@ -622,6 +626,60 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
update_time_level (queue);
}
+typedef struct
+{
+ GstClockTime timestamp;
+ gboolean with_duration;
+} BufferListApplyTimeData;
+
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
+{
+ BufferListApplyTimeData *data = user_data;
+
+ GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
+ " duration %" GST_TIME_FORMAT, idx,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+ data->timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+ if (data->with_duration && GST_BUFFER_DURATION_IS_VALID (*buf))
+ data->timestamp += GST_BUFFER_DURATION (*buf);
+
+ GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp));
+
+ return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
+ GstSegment * segment, gboolean with_duration, gboolean sink)
+{
+ BufferListApplyTimeData data;
+
+ /* if no timestamp is set, assume it's continuous with the previous time */
+ data.timestamp = segment->position;
+ data.with_duration = with_duration;
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data);
+
+ GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (data.timestamp));
+
+ segment->position = data.timestamp;
+
+ if (sink)
+ queue->sink_tainted = TRUE;
+ else
+ queue->src_tainted = TRUE;
+
+ /* calc diff with other end */
+ update_time_level (queue);
+}
+
static void
gst_queue_locked_flush (GstQueue * queue, gboolean full)
{
@@ -678,6 +736,40 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
GST_QUEUE_SIGNAL_ADD (queue);
}
+static gboolean
+buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
+{
+ guint *p_size = data;
+ gsize buf_size;
+
+ buf_size = gst_buffer_get_size (*buf);
+ GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
+ *p_size += buf_size;
+ return TRUE;
+}
+
+static inline void
+gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
+{
+ GstQueueItem *qitem;
+ GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+ gsize bsize = 0;
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+
+ /* add buffer to the statistics */
+ queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
+ queue->cur_level.bytes += bsize;
+ apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE, TRUE);
+
+ qitem = g_slice_new (GstQueueItem);
+ qitem->item = item;
+ qitem->is_query = FALSE;
+ qitem->size = bsize;
+ gst_queue_array_push_tail (queue->queue, qitem);
+ GST_QUEUE_SIGNAL_ADD (queue);
+}
+
static inline void
gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
{
@@ -751,7 +843,19 @@ gst_queue_locked_dequeue (GstQueue * queue)
/* if the queue is empty now, update the other side */
if (queue->cur_level.buffers == 0)
queue->cur_level.time = 0;
+ } else if (GST_IS_BUFFER_LIST (item)) {
+ GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "retrieved buffer list %p from queue", buffer_list);
+ queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
+ queue->cur_level.bytes -= bufsize;
+ apply_buffer_list (queue, buffer_list, &queue->src_segment, TRUE, FALSE);
+
+ /* if the queue is empty now, update the other side */
+ if (queue->cur_level.buffers == 0)
+ queue->cur_level.time = 0;
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
@@ -1014,11 +1118,27 @@ gst_queue_leak_downstream (GstQueue * queue)
}
}
+static gboolean
+discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
+{
+ GstQueue *queue = user_data;
+ GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
+
+ if (subbuffer) {
+ *buffer = subbuffer;
+ GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+ } else {
+ GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+ }
+
+ return FALSE;
+}
+
static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
+ GstMiniObject * obj, gboolean is_list)
{
GstQueue *queue;
- GstClockTime duration, timestamp;
queue = GST_QUEUE_CAST (parent);
@@ -1030,13 +1150,22 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
if (queue->unexpected)
goto out_unexpected;
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
- duration = GST_BUFFER_DURATION (buffer);
+ if (!is_list) {
+ GstClockTime duration, timestamp;
+ GstBuffer *buffer = GST_BUFFER_CAST (obj);
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
- G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
- GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
- GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ duration = GST_BUFFER_DURATION (buffer);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+ G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+ GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+ } else {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received buffer list %p with %u buffers", obj,
+ gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
+ }
/* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers.
@@ -1091,19 +1220,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
if (queue->tail_needs_discont) {
- GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+ if (!is_list) {
+ GstBuffer *buffer = GST_BUFFER_CAST (obj);
+ GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+ if (subbuffer) {
+ buffer = subbuffer;
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ } else {
+ GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+ }
- if (subbuffer) {
- buffer = subbuffer;
- GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ obj = GST_MINI_OBJECT_CAST (buffer);
} else {
- GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+ GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
+
+ buffer_list = gst_buffer_list_make_writable (buffer_list);
+ gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+ obj = GST_MINI_OBJECT_CAST (buffer_list);
}
queue->tail_needs_discont = FALSE;
}
/* put buffer in queue now */
- gst_queue_locked_enqueue_buffer (queue, buffer);
+ if (is_list)
+ gst_queue_locked_enqueue_buffer_list (queue, obj);
+ else
+ gst_queue_locked_enqueue_buffer (queue, obj);
GST_QUEUE_MUTEX_UNLOCK (queue);
return GST_FLOW_OK;
@@ -1113,7 +1256,7 @@ out_unref:
{
GST_QUEUE_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (obj);
return GST_FLOW_OK;
}
@@ -1124,7 +1267,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (obj);
return ret;
}
@@ -1133,7 +1276,7 @@ out_eos:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (obj);
return GST_FLOW_EOS;
}
@@ -1142,12 +1285,27 @@ out_unexpected:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (obj);
return GST_FLOW_EOS;
}
}
+static GstFlowReturn
+gst_queue_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list)
+{
+ return gst_queue_chain_buffer_or_list (pad, parent,
+ GST_MINI_OBJECT_CAST (buffer_list), TRUE);
+}
+
+static GstFlowReturn
+gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ return gst_queue_chain_buffer_or_list (pad, parent,
+ GST_MINI_OBJECT_CAST (buffer), FALSE);
+}
+
/* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */
static GstFlowReturn
@@ -1155,31 +1313,49 @@ gst_queue_push_one (GstQueue * queue)
{
GstFlowReturn result = queue->srcresult;
GstMiniObject *data;
+ gboolean is_list;
data = gst_queue_locked_dequeue (queue);
if (data == NULL)
goto no_item;
next:
- if (GST_IS_BUFFER (data)) {
- GstBuffer *buffer;
+ is_list = GST_IS_BUFFER_LIST (data);
- buffer = GST_BUFFER_CAST (data);
+ if (GST_IS_BUFFER (data) || is_list) {
+ if (!is_list) {
+ GstBuffer *buffer;
- if (queue->head_needs_discont) {
- GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+ buffer = GST_BUFFER_CAST (data);
- if (subbuffer) {
- buffer = subbuffer;
- GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
- } else {
- GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+ if (queue->head_needs_discont) {
+ GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+ if (subbuffer) {
+ buffer = subbuffer;
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ } else {
+ GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+ }
+ queue->head_needs_discont = FALSE;
}
- queue->head_needs_discont = FALSE;
- }
- GST_QUEUE_MUTEX_UNLOCK (queue);
- result = gst_pad_push (queue->srcpad, buffer);
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ result = gst_pad_push (queue->srcpad, buffer);
+ } else {
+ GstBufferList *buffer_list;
+
+ buffer_list = GST_BUFFER_LIST_CAST (data);
+
+ if (queue->head_needs_discont) {
+ buffer_list = gst_buffer_list_make_writable (buffer_list);
+ gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+ queue->head_needs_discont = FALSE;
+ }
+
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ result = gst_pad_push_list (queue->srcpad, buffer_list);
+ }
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -1195,6 +1371,10 @@ next:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
+ } else if (GST_IS_BUFFER_LIST (data)) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping EOS buffer list %p", data);
+ gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);