summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorSebastian Dröge <slomo@circular-chaos.org>2013-05-27 13:01:43 +0200
committerSebastian Dröge <slomo@circular-chaos.org>2013-05-27 13:01:43 +0200
commitb6fac17502957bb4e10e73926da87f289a1d94b1 (patch)
treef3be3452e6290ff851b008912b9063e6e03690ed /plugins
parent73895c05b116bf0fcfc7c2b12d3f1b04688c8525 (diff)
queue/queue2/multiqueue: When flushing, make sure to not lose any sticky events
https://bugzilla.gnome.org/show_bug.cgi?id=688824
Diffstat (limited to 'plugins')
-rw-r--r--plugins/elements/gstmultiqueue.c52
-rw-r--r--plugins/elements/gstqueue.c17
-rw-r--r--plugins/elements/gstqueue2.c12
3 files changed, 65 insertions, 16 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index fd5fe2e12..1f258e80b 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -198,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
+static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
+
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
@@ -733,7 +735,8 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
}
static gboolean
-gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
+gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
+ gboolean full)
{
gboolean result;
@@ -760,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->sink_tainted = sq->src_tainted = TRUE;
} else {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- gst_data_queue_flush (sq->queue);
+ gst_single_queue_flush_queue (sq, full);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */
@@ -1372,7 +1375,7 @@ out_flushing:
* so empty this one and trigger dynamic queue growth. At
* this point the srcresult is not OK, NOT_LINKED
* or EOS, i.e. a real failure */
- gst_data_queue_flush (sq->queue);
+ gst_single_queue_flush_queue (sq, FALSE);
single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad);
@@ -1511,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event);
- gst_single_queue_flush (mq, sq, TRUE);
+ gst_single_queue_flush (mq, sq, TRUE, FALSE);
goto done;
case GST_EVENT_FLUSH_STOP:
@@ -1520,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event);
- gst_single_queue_flush (mq, sq, FALSE);
+ gst_single_queue_flush (mq, sq, FALSE, FALSE);
goto done;
case GST_EVENT_SEGMENT:
/* take ref because the queue will take ownership and we need the event
@@ -1646,9 +1649,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
switch (mode) {
case GST_PAD_MODE_PUSH:
if (active) {
- result = gst_single_queue_flush (mq, sq, FALSE);
+ result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
} else {
- result = gst_single_queue_flush (mq, sq, TRUE);
+ result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
/* make sure streaming finishes */
result |= gst_pad_stop_task (pad);
}
@@ -1952,6 +1955,41 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
}
static void
+gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
+{
+ GstDataQueueItem *sitem;
+ gboolean was_flushing = FALSE;
+
+ while (!gst_data_queue_is_empty (sq->queue)) {
+ GstMiniObject *data;
+
+ /* FIXME: If this fails here although the queue is not empty,
+ * we're flushing... but we want to rescue all sticky
+ * events nonetheless.
+ */
+ if (!gst_data_queue_pop (sq->queue, &sitem)) {
+ was_flushing = TRUE;
+ gst_data_queue_set_flushing (sq->queue, FALSE);
+ continue;
+ }
+
+ data = sitem->object;
+
+ if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+ GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+ && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+ gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
+ }
+
+ sitem->destroy (sitem);
+ }
+
+ gst_data_queue_flush (sq->queue);
+ if (was_flushing)
+ gst_data_queue_set_flushing (sq->queue, TRUE);
+}
+
+static void
gst_single_queue_free (GstSingleQueue * sq)
{
/* DRAIN QUEUE */
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index d3780ba23..834826571 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -209,7 +209,7 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
-static void gst_queue_locked_flush (GstQueue * queue);
+static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active);
@@ -573,7 +573,7 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
}
static void
-gst_queue_locked_flush (GstQueue * queue)
+gst_queue_locked_flush (GstQueue * queue, gboolean full)
{
GstMiniObject *data;
@@ -581,6 +581,11 @@ gst_queue_locked_flush (GstQueue * queue)
data = gst_queue_array_pop_head (queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
+ if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+ GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+ && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+ gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+ }
if (!GST_IS_QUERY (data))
gst_mini_object_unref (data);
}
@@ -627,7 +632,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
/* Zero the thresholds, this makes sure the queue is completely
* filled and we can read all data from the queue. */
if (queue->flush_on_eos)
- gst_queue_locked_flush (queue);
+ gst_queue_locked_flush (queue, FALSE);
else
GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
/* mark the queue as EOS. This prevents us from accepting more data. */
@@ -758,7 +763,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_pad_push_event (queue->srcpad, event);
GST_QUEUE_MUTEX_LOCK (queue);
- gst_queue_locked_flush (queue);
+ gst_queue_locked_flush (queue, FALSE);
queue->srcresult = GST_FLOW_OK;
queue->eos = FALSE;
queue->unexpected = FALSE;
@@ -1228,7 +1233,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_FLUSHING)
- gst_queue_locked_flush (queue);
+ gst_queue_locked_flush (queue, FALSE);
else
GST_QUEUE_SIGNAL_DEL (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
@@ -1377,7 +1382,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
/* step 1, unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_FLUSHING;
- gst_queue_locked_flush (queue);
+ gst_queue_locked_flush (queue, TRUE);
GST_QUEUE_MUTEX_UNLOCK (queue);
}
result = TRUE;
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c
index c1f1ce662..90395dff7 100644
--- a/plugins/elements/gstqueue2.c
+++ b/plugins/elements/gstqueue2.c
@@ -1520,7 +1520,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
}
static void
-gst_queue2_locked_flush (GstQueue2 * queue)
+gst_queue2_locked_flush (GstQueue2 * queue, gboolean full)
{
if (!QUEUE_IS_USING_QUEUE (queue)) {
if (QUEUE_IS_USING_TEMP_FILE (queue))
@@ -1530,6 +1530,12 @@ gst_queue2_locked_flush (GstQueue2 * queue)
while (!g_queue_is_empty (&queue->queue)) {
GstMiniObject *data = g_queue_pop_head (&queue->queue);
+ if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+ GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+ && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+ gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+ }
+
/* Then lose another reference because we are supposed to destroy that
data when flushing */
if (!GST_IS_QUERY (data))
@@ -2202,7 +2208,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
gst_pad_push_event (queue->srcpad, event);
GST_QUEUE2_MUTEX_LOCK (queue);
- gst_queue2_locked_flush (queue);
+ gst_queue2_locked_flush (queue, FALSE);
queue->srcresult = GST_FLOW_OK;
queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE;
@@ -3043,7 +3049,7 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_FLUSHING;
queue->sinkresult = GST_FLOW_FLUSHING;
- gst_queue2_locked_flush (queue);
+ gst_queue2_locked_flush (queue, TRUE);
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
result = TRUE;