summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian.droege@collabora.co.uk>2011-03-22 13:19:47 +0100
committerSebastian Dröge <sebastian.droege@collabora.co.uk>2011-05-14 11:39:35 +0200
commit9f831097062205a0382d553d7732e737a9a5dd3f (patch)
tree42e4509ada27a8dd203aec39fe70208a523088fb
parent4a836cae9fa9240d7ed3c6e5a9d7ea9afa484c0a (diff)
multiqueue: Add mode to synchronize deactivated/not-linked streams by the running time
Fixes bug #645107, #600648.
-rw-r--r--plugins/elements/gstmultiqueue.c202
-rw-r--r--plugins/elements/gstmultiqueue.h3
2 files changed, 196 insertions, 9 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index b6b2d245a..e1ddc8107 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -156,6 +156,8 @@ struct _GstSingleQueue
guint32 nextid; /* ID of the next object waiting to be pushed */
guint32 oldid; /* ID of the last object pushed (last in a series) */
guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
+ GstClockTime next_time; /* End running time of next buffer to be pushed */
+ GstClockTime last_time; /* Start running time of last pushed buffer */
GCond *turn; /* SingleQueue turn waiting conditional */
};
@@ -179,6 +181,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
+static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
@@ -224,6 +227,7 @@ enum
#define DEFAULT_USE_BUFFERING FALSE
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
+#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
enum
{
@@ -237,6 +241,7 @@ enum
PROP_USE_BUFFERING,
PROP_LOW_PERCENT,
PROP_HIGH_PERCENT,
+ PROP_SYNC_BY_RUNNING_TIME,
PROP_LAST
};
@@ -396,6 +401,22 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
"High threshold for buffering to finish", 0, 100,
DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstMultiQueue:sync-by-running-time
+ *
+ * If enabled multiqueue will synchronize deactivated or not-linked streams
+ * to the activated and linked streams by taking the running time.
+ * Otherwise multiqueue will synchronize the deactivated or not-linked
+ * streams by keeping the order in which buffers and events arrived compared
+ * to active and linked streams.
+ *
+ * Since: 0.10.33
+ */
+ g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
+ g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
+ "Synchronize deactivated or not-linked streams by running time",
+ DEFAULT_SYNC_BY_RUNNING_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gobject_class->finalize = gst_multi_queue_finalize;
@@ -425,8 +446,11 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
mqueue->low_percent = DEFAULT_LOW_PERCENT;
mqueue->high_percent = DEFAULT_HIGH_PERCENT;
+ mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+
mqueue->counter = 1;
mqueue->highid = -1;
+ mqueue->high_time = GST_CLOCK_TIME_NONE;
mqueue->qlock = g_mutex_new ();
}
@@ -499,6 +523,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
case PROP_HIGH_PERCENT:
mq->high_percent = g_value_get_int (value);
break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ mq->sync_by_running_time = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -541,6 +568,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id,
case PROP_HIGH_PERCENT:
g_value_set_int (value, mq->high_percent);
break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ g_value_set_boolean (value, mq->sync_by_running_time);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -740,8 +770,15 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->nextid = 0;
sq->oldid = 0;
sq->last_oldid = G_MAXUINT32;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
+ /* Reset high time to be recomputed next */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ mq->high_time = GST_CLOCK_TIME_NONE;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
sq->flushing = FALSE;
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
@@ -946,6 +983,71 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
+static GstClockTime
+get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
+{
+ GstClockTime time = GST_CLOCK_TIME_NONE;
+
+ if (GST_IS_BUFFER (object)) {
+ GstBuffer *buf = GST_BUFFER_CAST (object);
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ }
+ } else if (GST_IS_BUFFER_LIST (object)) {
+ GstBufferList *list = GST_BUFFER_LIST_CAST (object);
+ GstBufferListIterator *it = gst_buffer_list_iterate (list);
+ GstBuffer *buf;
+
+ do {
+ while ((buf = gst_buffer_list_iterator_next (it))) {
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ if (!end)
+ goto done;
+ } else if (!end) {
+ goto done;
+ }
+ }
+ } while (gst_buffer_list_iterator_next_group (it));
+ } else if (GST_IS_EVENT (object)) {
+ GstEvent *event = GST_EVENT_CAST (object);
+
+ /* For newsegment events return the running time of the start position */
+ if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
+ GstSegment new_segment = *segment;
+ gboolean update;
+ gdouble rate, applied_rate;
+ GstFormat format;
+ gint64 start, stop, position;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
+ &format, &start, &stop, &position);
+ if (format == GST_FORMAT_TIME) {
+ gst_segment_set_newsegment_full (&new_segment, update, rate,
+ applied_rate, format, start, stop, position);
+
+ time =
+ gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
+ new_segment.start);
+ }
+ }
+ }
+
+done:
+ return time;
+}
+
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object)
@@ -1078,6 +1180,7 @@ gst_multi_queue_loop (GstPad * pad)
GstMiniObject *object = NULL;
guint32 newid;
GstFlowReturn result;
+ GstClockTime next_time;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
@@ -1099,6 +1202,9 @@ gst_multi_queue_loop (GstPad * pad)
object = gst_multi_queue_item_steal_object (item);
gst_multi_queue_item_destroy (item);
+ /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
+ next_time = get_running_time (&sq->src_segment, object, TRUE);
+
GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
sq->id, newid, sq->last_oldid);
@@ -1107,9 +1213,9 @@ gst_multi_queue_loop (GstPad * pad)
* or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work
* there too */
- if (sq->srcresult == GST_FLOW_NOT_LINKED ||
- (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) ||
- sq->last_oldid > mq->highid) {
+ if (sq->srcresult == GST_FLOW_NOT_LINKED
+ || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
+ || sq->last_oldid > mq->highid) {
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
@@ -1124,6 +1230,7 @@ gst_multi_queue_loop (GstPad * pad)
/* Update the nextid so other threads know when to wake us up */
sq->nextid = newid;
+ sq->next_time = next_time;
/* Update the oldid (the last ID we output) for highid tracking */
if (sq->last_oldid != G_MAXUINT32)
@@ -1134,10 +1241,20 @@ gst_multi_queue_loop (GstPad * pad)
/* Recompute the highid */
compute_high_id (mq);
- while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
- GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
- "newid %u and highid %u", sq->id, newid, mq->highid);
+ /* Recompute the high time */
+ compute_high_time (mq);
+ while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
+ (mq->high_time == GST_CLOCK_TIME_NONE
+ || next_time >= mq->high_time))
+ || (!mq->sync_by_running_time && newid > mq->highid))
+ && sq->srcresult == GST_FLOW_NOT_LINKED) {
+
+ GST_DEBUG_OBJECT (mq,
+ "queue %d sleeping for not-linked wakeup with "
+ "newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
/* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
@@ -1151,8 +1268,13 @@ gst_multi_queue_loop (GstPad * pad)
goto out_flushing;
}
+ /* Recompute the high time */
+ compute_high_time (mq);
+
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
- "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
+ "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
}
/* Re-compute the high_id in case someone else pushed */
@@ -1162,8 +1284,9 @@ gst_multi_queue_loop (GstPad * pad)
/* Wake up all non-linked pads */
wake_up_next_non_linked (mq);
}
- /* We're done waiting, we can clear the nextid */
+ /* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
@@ -1174,6 +1297,18 @@ gst_multi_queue_loop (GstPad * pad)
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
+ /* Update time stats */
+ next_time = get_running_time (&sq->src_segment, object, FALSE);
+ if (next_time != GST_CLOCK_TIME_NONE) {
+ if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
+ sq->last_time = next_time;
+ if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
+ /* Wake up all non-linked pads now that we advanceed the high time */
+ mq->high_time = next_time;
+ wake_up_next_non_linked (mq);
+ }
+ }
+
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
sq->srcresult = result;
@@ -1187,6 +1322,7 @@ gst_multi_queue_loop (GstPad * pad)
gst_flow_get_name (sq->srcresult));
sq->last_oldid = newid;
+
return;
out_flushing:
@@ -1516,7 +1652,10 @@ wake_up_next_non_linked (GstMultiQueue * mq)
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- if (sq->nextid != 0 && sq->nextid <= mq->highid) {
+ if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE
+ && sq->next_time != GST_CLOCK_TIME_NONE
+ && sq->next_time >= mq->high_time)
+ || (sq->nextid != 0 && sq->nextid <= mq->highid)) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (sq->turn);
}
@@ -1567,6 +1706,49 @@ compute_high_id (GstMultiQueue * mq)
lowest);
}
+/* WITH LOCK TAKEN */
+static void
+compute_high_time (GstMultiQueue * mq)
+{
+ /* The high-id is either the highest id among the linked pads, or if all
+ * pads are not-linked, it's the lowest not-linked pad */
+ GList *tmp;
+ GstClockTime highest = GST_CLOCK_TIME_NONE;
+ GstClockTime lowest = GST_CLOCK_TIME_NONE;
+
+ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+
+ GST_LOG_OBJECT (mq,
+ "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
+ GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
+ GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
+
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* No need to consider queues which are not waiting */
+ if (sq->next_time == GST_CLOCK_TIME_NONE) {
+ GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+ continue;
+ }
+
+ if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
+ lowest = sq->next_time;
+ } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
+ /* If we don't have a global highid, or the global highid is lower than
+ * this single queue's last outputted id, store the queue's one,
+ * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
+ if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest)
+ highest = sq->last_time;
+ }
+ }
+
+ mq->high_time = highest;
+
+ GST_LOG_OBJECT (mq,
+ "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
+}
+
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
((q)->max_size.format) <= (value))
@@ -1770,6 +1952,8 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id)
sq->nextid = 0;
sq->oldid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
sq->turn = g_cond_new ();
sq->sinktime = GST_CLOCK_TIME_NONE;
diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h
index b9c28cd44..bb3d84042 100644
--- a/plugins/elements/gstmultiqueue.h
+++ b/plugins/elements/gstmultiqueue.h
@@ -50,6 +50,8 @@ typedef struct _GstMultiQueueClass GstMultiQueueClass;
struct _GstMultiQueue {
GstElement element;
+ gboolean sync_by_running_time;
+
/* number of queues */
guint nbqueues;
@@ -65,6 +67,7 @@ struct _GstMultiQueue {
guint32 counter; /* incoming object counter, use atomic accesses */
guint32 highid; /* contains highest id of last outputted object */
+ GstClockTime high_time; /* highest start running time */
GMutex * qlock; /* Global queue lock (vs object lock or individual */
/* queues lock). Protects nbqueues, queues, global */