diff options
author | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2011-03-22 13:19:47 +0100 |
---|---|---|
committer | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2011-05-14 11:39:35 +0200 |
commit | 9f831097062205a0382d553d7732e737a9a5dd3f (patch) | |
tree | 42e4509ada27a8dd203aec39fe70208a523088fb | |
parent | 4a836cae9fa9240d7ed3c6e5a9d7ea9afa484c0a (diff) |
multiqueue: Add mode to synchronize deactivated/not-linked streams by the running time
Fixes bug #645107, #600648.
-rw-r--r-- | plugins/elements/gstmultiqueue.c | 202 | ||||
-rw-r--r-- | plugins/elements/gstmultiqueue.h | 3 |
2 files changed, 196 insertions, 9 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index b6b2d245a..e1ddc8107 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -156,6 +156,8 @@ struct _GstSingleQueue guint32 nextid; /* ID of the next object waiting to be pushed */ guint32 oldid; /* ID of the last object pushed (last in a series) */ guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */ + GstClockTime next_time; /* End running time of next buffer to be pushed */ + GstClockTime last_time; /* Start running time of last pushed buffer */ GCond *turn; /* SingleQueue turn waiting conditional */ }; @@ -179,6 +181,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue); static void wake_up_next_non_linked (GstMultiQueue * mq); static void compute_high_id (GstMultiQueue * mq); +static void compute_high_time (GstMultiQueue * mq); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); @@ -224,6 +227,7 @@ enum #define DEFAULT_USE_BUFFERING FALSE #define DEFAULT_LOW_PERCENT 10 #define DEFAULT_HIGH_PERCENT 99 +#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE enum { @@ -237,6 +241,7 @@ enum PROP_USE_BUFFERING, PROP_LOW_PERCENT, PROP_HIGH_PERCENT, + PROP_SYNC_BY_RUNNING_TIME, PROP_LAST }; @@ -396,6 +401,22 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass) "High threshold for buffering to finish", 0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstMultiQueue:sync-by-running-time + * + * If enabled multiqueue will synchronize deactivated or not-linked streams + * to the activated and linked streams by taking the running time. + * Otherwise multiqueue will synchronize the deactivated or not-linked + * streams by keeping the order in which buffers and events arrived compared + * to active and linked streams. + * + * Since: 0.10.33 + */ + g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME, + g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time", + "Synchronize deactivated or not-linked streams by running time", + DEFAULT_SYNC_BY_RUNNING_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gobject_class->finalize = gst_multi_queue_finalize; @@ -425,8 +446,11 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) mqueue->low_percent = DEFAULT_LOW_PERCENT; mqueue->high_percent = DEFAULT_HIGH_PERCENT; + mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME; + mqueue->counter = 1; mqueue->highid = -1; + mqueue->high_time = GST_CLOCK_TIME_NONE; mqueue->qlock = g_mutex_new (); } @@ -499,6 +523,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, case PROP_HIGH_PERCENT: mq->high_percent = g_value_get_int (value); break; + case PROP_SYNC_BY_RUNNING_TIME: + mq->sync_by_running_time = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -541,6 +568,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id, case PROP_HIGH_PERCENT: g_value_set_int (value, mq->high_percent); break; + case PROP_SYNC_BY_RUNNING_TIME: + g_value_set_boolean (value, mq->sync_by_running_time); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -740,8 +770,15 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->nextid = 0; sq->oldid = 0; sq->last_oldid = G_MAXUINT32; + sq->next_time = GST_CLOCK_TIME_NONE; + sq->last_time = GST_CLOCK_TIME_NONE; gst_data_queue_set_flushing (sq->queue, FALSE); + /* Reset high time to be recomputed next */ + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + mq->high_time = GST_CLOCK_TIME_NONE; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + sq->flushing = FALSE; GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); @@ -946,6 +983,71 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } +static GstClockTime +get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end) +{ + GstClockTime time = GST_CLOCK_TIME_NONE; + + if (GST_IS_BUFFER (object)) { + GstBuffer *buf = GST_BUFFER_CAST (object); + + if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + time = GST_BUFFER_TIMESTAMP (buf); + if (end && GST_BUFFER_DURATION_IS_VALID (buf)) + time += GST_BUFFER_DURATION (buf); + if (time > segment->stop) + time = segment->stop; + time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time); + } + } else if (GST_IS_BUFFER_LIST (object)) { + GstBufferList *list = GST_BUFFER_LIST_CAST (object); + GstBufferListIterator *it = gst_buffer_list_iterate (list); + GstBuffer *buf; + + do { + while ((buf = gst_buffer_list_iterator_next (it))) { + if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + time = GST_BUFFER_TIMESTAMP (buf); + if (end && GST_BUFFER_DURATION_IS_VALID (buf)) + time += GST_BUFFER_DURATION (buf); + if (time > segment->stop) + time = segment->stop; + time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time); + if (!end) + goto done; + } else if (!end) { + goto done; + } + } + } while (gst_buffer_list_iterator_next_group (it)); + } else if (GST_IS_EVENT (object)) { + GstEvent *event = GST_EVENT_CAST (object); + + /* For newsegment events return the running time of the start position */ + if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) { + GstSegment new_segment = *segment; + gboolean update; + gdouble rate, applied_rate; + GstFormat format; + gint64 start, stop, position; + + gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate, + &format, &start, &stop, &position); + if (format == GST_FORMAT_TIME) { + gst_segment_set_newsegment_full (&new_segment, update, rate, + applied_rate, format, start, stop, position); + + time = + gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME, + new_segment.start); + } + } + } + +done: + return time; +} + static GstFlowReturn gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, GstMiniObject * object) @@ -1078,6 +1180,7 @@ gst_multi_queue_loop (GstPad * pad) GstMiniObject *object = NULL; guint32 newid; GstFlowReturn result; + GstClockTime next_time; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = sq->mqueue; @@ -1099,6 +1202,9 @@ gst_multi_queue_loop (GstPad * pad) object = gst_multi_queue_item_steal_object (item); gst_multi_queue_item_destroy (item); + /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */ + next_time = get_running_time (&sq->src_segment, object, TRUE); + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", sq->id, newid, sq->last_oldid); @@ -1107,9 +1213,9 @@ gst_multi_queue_loop (GstPad * pad) * or it's the first loop, or we just passed the previous highid, * we might need to wake some sleeping pad up, so there's extra work * there too */ - if (sq->srcresult == GST_FLOW_NOT_LINKED || - (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) || - sq->last_oldid > mq->highid) { + if (sq->srcresult == GST_FLOW_NOT_LINKED + || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) + || sq->last_oldid > mq->highid) { GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", gst_flow_get_name (sq->srcresult)); @@ -1124,6 +1230,7 @@ gst_multi_queue_loop (GstPad * pad) /* Update the nextid so other threads know when to wake us up */ sq->nextid = newid; + sq->next_time = next_time; /* Update the oldid (the last ID we output) for highid tracking */ if (sq->last_oldid != G_MAXUINT32) @@ -1134,10 +1241,20 @@ gst_multi_queue_loop (GstPad * pad) /* Recompute the highid */ compute_high_id (mq); - while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { - GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " - "newid %u and highid %u", sq->id, newid, mq->highid); + /* Recompute the high time */ + compute_high_time (mq); + while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE && + (mq->high_time == GST_CLOCK_TIME_NONE + || next_time >= mq->high_time)) + || (!mq->sync_by_running_time && newid > mq->highid)) + && sq->srcresult == GST_FLOW_NOT_LINKED) { + + GST_DEBUG_OBJECT (mq, + "queue %d sleeping for not-linked wakeup with " + "newid %u, highid %u, next_time %" GST_TIME_FORMAT + ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid, + GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time)); /* Wake up all non-linked pads before we sleep */ wake_up_next_non_linked (mq); @@ -1151,8 +1268,13 @@ gst_multi_queue_loop (GstPad * pad) goto out_flushing; } + /* Recompute the high time */ + compute_high_time (mq); + GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " - "wakeup with newid %u and highid %u", sq->id, newid, mq->highid); + "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT + ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid, + GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time)); } /* Re-compute the high_id in case someone else pushed */ @@ -1162,8 +1284,9 @@ gst_multi_queue_loop (GstPad * pad) /* Wake up all non-linked pads */ wake_up_next_non_linked (mq); } - /* We're done waiting, we can clear the nextid */ + /* We're done waiting, we can clear the nextid and nexttime */ sq->nextid = 0; + sq->next_time = GST_CLOCK_TIME_NONE; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } @@ -1174,6 +1297,18 @@ gst_multi_queue_loop (GstPad * pad) GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", gst_flow_get_name (sq->srcresult)); + /* Update time stats */ + next_time = get_running_time (&sq->src_segment, object, FALSE); + if (next_time != GST_CLOCK_TIME_NONE) { + if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time) + sq->last_time = next_time; + if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) { + /* Wake up all non-linked pads now that we advanceed the high time */ + mq->high_time = next_time; + wake_up_next_non_linked (mq); + } + } + /* Try to push out the new object */ result = gst_single_queue_push_one (mq, sq, object); sq->srcresult = result; @@ -1187,6 +1322,7 @@ gst_multi_queue_loop (GstPad * pad) gst_flow_get_name (sq->srcresult)); sq->last_oldid = newid; + return; out_flushing: @@ -1516,7 +1652,10 @@ wake_up_next_non_linked (GstMultiQueue * mq) GstSingleQueue *sq = (GstSingleQueue *) tmp->data; if (sq->srcresult == GST_FLOW_NOT_LINKED) { - if (sq->nextid != 0 && sq->nextid <= mq->highid) { + if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE + && sq->next_time != GST_CLOCK_TIME_NONE + && sq->next_time >= mq->high_time) + || (sq->nextid != 0 && sq->nextid <= mq->highid)) { GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id); g_cond_signal (sq->turn); } @@ -1567,6 +1706,49 @@ compute_high_id (GstMultiQueue * mq) lowest); } +/* WITH LOCK TAKEN */ +static void +compute_high_time (GstMultiQueue * mq) +{ + /* The high-id is either the highest id among the linked pads, or if all + * pads are not-linked, it's the lowest not-linked pad */ + GList *tmp; + GstClockTime highest = GST_CLOCK_TIME_NONE; + GstClockTime lowest = GST_CLOCK_TIME_NONE; + + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + + GST_LOG_OBJECT (mq, + "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%" + GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time), + GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult)); + + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* No need to consider queues which are not waiting */ + if (sq->next_time == GST_CLOCK_TIME_NONE) { + GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id); + continue; + } + + if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest) + lowest = sq->next_time; + } else if (sq->srcresult != GST_FLOW_UNEXPECTED) { + /* If we don't have a global highid, or the global highid is lower than + * this single queue's last outputted id, store the queue's one, + * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */ + if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest) + highest = sq->last_time; + } + } + + mq->high_time = highest; + + GST_LOG_OBJECT (mq, + "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %" + GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest)); +} + #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \ ((q)->max_size.format) <= (value)) @@ -1770,6 +1952,8 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id) sq->nextid = 0; sq->oldid = 0; + sq->next_time = GST_CLOCK_TIME_NONE; + sq->last_time = GST_CLOCK_TIME_NONE; sq->turn = g_cond_new (); sq->sinktime = GST_CLOCK_TIME_NONE; diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index b9c28cd44..bb3d84042 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -50,6 +50,8 @@ typedef struct _GstMultiQueueClass GstMultiQueueClass; struct _GstMultiQueue { GstElement element; + gboolean sync_by_running_time; + /* number of queues */ guint nbqueues; @@ -65,6 +67,7 @@ struct _GstMultiQueue { guint32 counter; /* incoming object counter, use atomic accesses */ guint32 highid; /* contains highest id of last outputted object */ + GstClockTime high_time; /* highest start running time */ GMutex * qlock; /* Global queue lock (vs object lock or individual */ /* queues lock). Protects nbqueues, queues, global */ |