diff options
-rw-r--r-- | plugins/elements/gstmultiqueue.c | 79 |
1 files changed, 67 insertions, 12 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index c60596bac..aef27ad37 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -1209,7 +1209,7 @@ done: static GstFlowReturn gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, - GstMiniObject * object) + GstMiniObject * object, gboolean * allow_drop) { GstFlowReturn result = sq->srcresult; @@ -1226,11 +1226,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, /* Applying the buffer may have made the queue non-full again, unblock it if needed */ gst_data_queue_limits_changed (sq->queue); - GST_DEBUG_OBJECT (mq, - "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, - sq->id, buffer, GST_TIME_ARGS (timestamp)); - - result = gst_pad_push (sq->srcpad, buffer); + if (G_UNLIKELY (*allow_drop)) { + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT, + sq->id, buffer, GST_TIME_ARGS (timestamp)); + gst_buffer_unref (buffer); + } else { + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT, + sq->id, buffer, GST_TIME_ARGS (timestamp)); + result = gst_pad_push (sq->srcpad, buffer); + } } else if (GST_IS_EVENT (object)) { GstEvent *event; @@ -1239,11 +1245,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: result = GST_FLOW_EOS; + if (G_UNLIKELY (*allow_drop)) + *allow_drop = FALSE; break; case GST_EVENT_SEGMENT: apply_segment (mq, sq, event, &sq->src_segment); /* Applying the segment may have made the queue non-full again, unblock it if needed */ gst_data_queue_limits_changed (sq->queue); + if (G_UNLIKELY (*allow_drop)) { + result = GST_FLOW_OK; + *allow_drop = FALSE; + } break; case GST_EVENT_GAP: apply_gap (mq, sq, event, &sq->src_segment); @@ -1254,18 +1266,32 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, break; } - GST_DEBUG_OBJECT (mq, - "SingleQueue %d : Pushing event %p of type %s", - sq->id, event, GST_EVENT_TYPE_NAME (event)); + if (G_UNLIKELY (*allow_drop)) { + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Dropping EOS event %p of type %s", + sq->id, event, GST_EVENT_TYPE_NAME (event)); + gst_event_unref (event); + } else { + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Pushing event %p of type %s", + sq->id, event, GST_EVENT_TYPE_NAME (event)); - gst_pad_push_event (sq->srcpad, event); + gst_pad_push_event (sq->srcpad, event); + } } else if (GST_IS_QUERY (object)) { GstQuery *query; gboolean res; query = GST_QUERY_CAST (object); - res = gst_pad_peer_query (sq->srcpad, query); + if (G_UNLIKELY (*allow_drop)) { + GST_DEBUG_OBJECT (mq, + "SingleQueue %d : Dropping EOS query %p", sq->id, query); + gst_query_unref (query); + res = FALSE; + } else { + res = gst_pad_peer_query (sq->srcpad, query); + } GST_MULTI_QUEUE_MUTEX_LOCK (mq); sq->last_query = res; @@ -1354,10 +1380,12 @@ gst_multi_queue_loop (GstPad * pad) GstClockTime next_time; gboolean is_buffer; gboolean do_update_buffering = FALSE; + gboolean dropping = FALSE; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = sq->mqueue; +next: GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); if (sq->flushing) @@ -1485,7 +1513,7 @@ gst_multi_queue_loop (GstPad * pad) GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Try to push out the new object */ - result = gst_single_queue_push_one (mq, sq, object); + result = gst_single_queue_push_one (mq, sq, object, &dropping); object = NULL; /* Check if we pushed something already and if this is @@ -1525,6 +1553,25 @@ gst_multi_queue_loop (GstPad * pad) if (is_buffer) sq->pushed = TRUE; + + /* now hold on a bit; + * can not simply throw this result to upstream, because + * that might already be onto another segment, so we have to make + * sure we are relaying the correct info wrt proper segment */ + if (result == GST_FLOW_EOS && !dropping && + sq->srcresult != GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id); + dropping = TRUE; + /* pretend we have not seen EOS yet for upstream's sake */ + result = sq->srcresult; + } else if (dropping && gst_data_queue_is_empty (sq->queue)) { + /* queue empty, so stop dropping + * we can commit the result we have now, + * which is either OK after a segment, or EOS */ + GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id); + dropping = FALSE; + result = GST_FLOW_EOS; + } sq->srcresult = result; sq->last_oldid = newid; @@ -1534,6 +1581,9 @@ gst_multi_queue_loop (GstPad * pad) GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); gst_multi_queue_post_buffering (mq); + if (dropping) + goto next; + if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED && result != GST_FLOW_EOS) goto out_flushing; @@ -1807,6 +1857,11 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) case GST_EVENT_SEGMENT: apply_segment (mq, sq, sref, &sq->sink_segment); gst_event_unref (sref); + /* a new segment allows us to accept more buffers if we got EOS + * from downstream */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + sq->srcresult = GST_FLOW_OK; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case GST_EVENT_GAP: apply_gap (mq, sq, sref, &sq->sink_segment); |