summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/elements/gstmultiqueue.c79
1 files changed, 67 insertions, 12 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index c60596bac..aef27ad37 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -1209,7 +1209,7 @@ done:
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
- GstMiniObject * object)
+ GstMiniObject * object, gboolean * allow_drop)
{
GstFlowReturn result = sq->srcresult;
@@ -1226,11 +1226,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
/* Applying the buffer may have made the queue non-full again, unblock it if needed */
gst_data_queue_limits_changed (sq->queue);
- GST_DEBUG_OBJECT (mq,
- "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
- sq->id, buffer, GST_TIME_ARGS (timestamp));
-
- result = gst_pad_push (sq->srcpad, buffer);
+ if (G_UNLIKELY (*allow_drop)) {
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
+ sq->id, buffer, GST_TIME_ARGS (timestamp));
+ gst_buffer_unref (buffer);
+ } else {
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
+ sq->id, buffer, GST_TIME_ARGS (timestamp));
+ result = gst_pad_push (sq->srcpad, buffer);
+ }
} else if (GST_IS_EVENT (object)) {
GstEvent *event;
@@ -1239,11 +1245,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
result = GST_FLOW_EOS;
+ if (G_UNLIKELY (*allow_drop))
+ *allow_drop = FALSE;
break;
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
/* Applying the segment may have made the queue non-full again, unblock it if needed */
gst_data_queue_limits_changed (sq->queue);
+ if (G_UNLIKELY (*allow_drop)) {
+ result = GST_FLOW_OK;
+ *allow_drop = FALSE;
+ }
break;
case GST_EVENT_GAP:
apply_gap (mq, sq, event, &sq->src_segment);
@@ -1254,18 +1266,32 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
break;
}
- GST_DEBUG_OBJECT (mq,
- "SingleQueue %d : Pushing event %p of type %s",
- sq->id, event, GST_EVENT_TYPE_NAME (event));
+ if (G_UNLIKELY (*allow_drop)) {
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Dropping EOS event %p of type %s",
+ sq->id, event, GST_EVENT_TYPE_NAME (event));
+ gst_event_unref (event);
+ } else {
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Pushing event %p of type %s",
+ sq->id, event, GST_EVENT_TYPE_NAME (event));
- gst_pad_push_event (sq->srcpad, event);
+ gst_pad_push_event (sq->srcpad, event);
+ }
} else if (GST_IS_QUERY (object)) {
GstQuery *query;
gboolean res;
query = GST_QUERY_CAST (object);
- res = gst_pad_peer_query (sq->srcpad, query);
+ if (G_UNLIKELY (*allow_drop)) {
+ GST_DEBUG_OBJECT (mq,
+ "SingleQueue %d : Dropping EOS query %p", sq->id, query);
+ gst_query_unref (query);
+ res = FALSE;
+ } else {
+ res = gst_pad_peer_query (sq->srcpad, query);
+ }
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->last_query = res;
@@ -1354,10 +1380,12 @@ gst_multi_queue_loop (GstPad * pad)
GstClockTime next_time;
gboolean is_buffer;
gboolean do_update_buffering = FALSE;
+ gboolean dropping = FALSE;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
+next:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
if (sq->flushing)
@@ -1485,7 +1513,7 @@ gst_multi_queue_loop (GstPad * pad)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Try to push out the new object */
- result = gst_single_queue_push_one (mq, sq, object);
+ result = gst_single_queue_push_one (mq, sq, object, &dropping);
object = NULL;
/* Check if we pushed something already and if this is
@@ -1525,6 +1553,25 @@ gst_multi_queue_loop (GstPad * pad)
if (is_buffer)
sq->pushed = TRUE;
+
+ /* now hold on a bit;
+ * can not simply throw this result to upstream, because
+ * that might already be onto another segment, so we have to make
+ * sure we are relaying the correct info wrt proper segment */
+ if (result == GST_FLOW_EOS && !dropping &&
+ sq->srcresult != GST_FLOW_NOT_LINKED) {
+ GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
+ dropping = TRUE;
+ /* pretend we have not seen EOS yet for upstream's sake */
+ result = sq->srcresult;
+ } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
+ /* queue empty, so stop dropping
+ * we can commit the result we have now,
+ * which is either OK after a segment, or EOS */
+ GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
+ dropping = FALSE;
+ result = GST_FLOW_EOS;
+ }
sq->srcresult = result;
sq->last_oldid = newid;
@@ -1534,6 +1581,9 @@ gst_multi_queue_loop (GstPad * pad)
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_multi_queue_post_buffering (mq);
+ if (dropping)
+ goto next;
+
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
&& result != GST_FLOW_EOS)
goto out_flushing;
@@ -1807,6 +1857,11 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
case GST_EVENT_SEGMENT:
apply_segment (mq, sq, sref, &sq->sink_segment);
gst_event_unref (sref);
+ /* a new segment allows us to accept more buffers if we got EOS
+ * from downstream */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ sq->srcresult = GST_FLOW_OK;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case GST_EVENT_GAP:
apply_gap (mq, sq, sref, &sq->sink_segment);