summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2010-01-26 15:51:44 +0100
committerWim Taymans <wim.taymans@collabora.co.uk>2010-01-26 15:51:44 +0100
commite1efa320ffefaf43f2527390c9716597b1e936e1 (patch)
tree0c7edae00e32f0fb3046a335bd7bf62c451eea67
parent19c5b2736fb70c253c1fb06c36dc59f76c3e16d5 (diff)
multiqueue: attempt at refactormultiqueue
-rw-r--r--plugins/elements/gstmultiqueue.c151
1 files changed, 82 insertions, 69 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index 30f11659c..0f4007c59 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -1010,96 +1010,109 @@ gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
return item;
}
-/* Each main loop attempts to push buffers until the return value
- * is not-linked. not-linked pads are not allowed to push data beyond
- * any linked pads, so they don't 'rush ahead of the pack'.
- */
-static void
-gst_multi_queue_loop (GstPad * pad)
+static GstMiniObject *
+dequeue_object (GstMultiQueue * mq, GstSingleQueue * sq, guint32 oldid,
+ guint32 * newid)
{
- GstSingleQueue *sq;
- GstMultiQueueItem *item;
GstDataQueueItem *sitem;
- GstMultiQueue *mq;
+ GstMultiQueueItem *item;
GstMiniObject *object;
- guint32 newid;
- guint32 oldid = G_MAXUINT32;
- GstFlowReturn result;
- sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
- mq = sq->mqueue;
+ GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
- do {
- GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
+next:
+ /* Get something from the queue, blocking until that happens, or we get
+ * flushed */
+ if (!(gst_data_queue_pop (sq->queue, &sitem)))
+ return NULL;
- /* Get something from the queue, blocking until that happens, or we get
- * flushed */
- if (!(gst_data_queue_pop (sq->queue, &sitem)))
- goto out_flushing;
-
- item = (GstMultiQueueItem *) sitem;
- newid = item->posid;
-
- /* steal the object and destroy the item */
- object = gst_multi_queue_item_steal_object (item);
- gst_multi_queue_item_destroy (item);
+ item = (GstMultiQueueItem *) sitem;
+ *newid = item->posid;
- GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
- sq->id, newid, oldid);
+ /* steal the object and destroy the item */
+ object = gst_multi_queue_item_steal_object (item);
+ gst_multi_queue_item_destroy (item);
- /* If we're not-linked, we do some extra work because we might need to
- * wait before pushing. If we're linked but there's a gap in the IDs,
- * or it's the first loop, or we just passed the previous highid,
- * we might need to wake some sleeping pad up, so there's extra work
- * there too */
- if (sq->srcresult == GST_FLOW_NOT_LINKED ||
- (oldid == G_MAXUINT32) || (newid != (oldid + 1)) ||
- oldid > mq->highid) {
- GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
+ sq->id, *newid, oldid);
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ /* If we're not-linked, we do some extra work because we might need to
+ * wait before pushing. If we're linked but there's a gap in the IDs,
+ * or it's the first loop, or we just passed the previous highid,
+ * we might need to wake some sleeping pad up, so there's extra work
+ * there too */
+ if (sq->srcresult == GST_FLOW_NOT_LINKED ||
+ (oldid == G_MAXUINT32) || (*newid != (oldid + 1)) || oldid > mq->highid) {
+ GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
- /* Update the nextid so other threads know when to wake us up */
- sq->nextid = newid;
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- /* Update the oldid (the last ID we output) for highid tracking */
- if (oldid != G_MAXUINT32)
- sq->oldid = oldid;
+ /* Update the nextid so other threads know when to wake us up */
+ sq->nextid = *newid;
- if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- /* Go to sleep until it's time to push this buffer */
+ /* Update the oldid (the last ID we output) for highid tracking */
+ if (oldid != G_MAXUINT32)
+ sq->oldid = oldid;
- /* Recompute the highid */
- compute_high_id (mq);
- while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
- GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
- "newid %u and highid %u", sq->id, newid, mq->highid);
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* Go to sleep until it's time to push this buffer */
+ /* Recompute the highid */
+ compute_high_id (mq);
+ while (*newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
+ GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
+ "newid %u and highid %u", sq->id, *newid, mq->highid);
- /* Wake up all non-linked pads before we sleep */
- wake_up_next_non_linked (mq);
- mq->numwaiting++;
- g_cond_wait (sq->turn, mq->qlock);
- mq->numwaiting--;
+ /* Wake up all non-linked pads before we sleep */
+ wake_up_next_non_linked (mq);
- GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
- "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
- }
+ mq->numwaiting++;
+ g_cond_wait (sq->turn, mq->qlock);
+ mq->numwaiting--;
- /* Re-compute the high_id in case someone else pushed */
- compute_high_id (mq);
- } else {
- compute_high_id (mq);
- /* Wake up all non-linked pads */
- wake_up_next_non_linked (mq);
+ GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
+ "wakeup with newid %u and highid %u", sq->id, *newid, mq->highid);
}
- /* We're done waiting, we can clear the nextid */
- sq->nextid = 0;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ /* Re-compute the high_id in case someone else pushed */
+ compute_high_id (mq);
+ } else {
+ compute_high_id (mq);
+ /* Wake up all non-linked pads */
+ wake_up_next_non_linked (mq);
}
+ /* We're done waiting, we can clear the nextid */
+ sq->nextid = 0;
+
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ }
+
+ return object;
+}
+
+/* Each main loop attempts to push buffers until the return value
+ * is not-linked. not-linked pads are not allowed to push data beyond
+ * any linked pads, so they don't 'rush ahead of the pack'.
+ */
+static void
+gst_multi_queue_loop (GstPad * pad)
+{
+ GstSingleQueue *sq;
+ GstMultiQueue *mq;
+ GstMiniObject *object;
+ guint32 newid;
+ guint32 oldid = G_MAXUINT32;
+ GstFlowReturn result;
+
+ sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
+ mq = sq->mqueue;
+
+ do {
+ /* take an object */
+ if (!(object = dequeue_object (mq, sq, oldid, &newid)))
+ goto out_flushing;
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));