diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-01-26 15:51:44 +0100 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-01-26 15:51:44 +0100 |
commit | e1efa320ffefaf43f2527390c9716597b1e936e1 (patch) | |
tree | 0c7edae00e32f0fb3046a335bd7bf62c451eea67 | |
parent | 19c5b2736fb70c253c1fb06c36dc59f76c3e16d5 (diff) |
multiqueue: attempt at refactormultiqueue
-rw-r--r-- | plugins/elements/gstmultiqueue.c | 151 |
1 files changed, 82 insertions, 69 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 30f11659c..0f4007c59 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -1010,96 +1010,109 @@ gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid) return item; } -/* Each main loop attempts to push buffers until the return value - * is not-linked. not-linked pads are not allowed to push data beyond - * any linked pads, so they don't 'rush ahead of the pack'. - */ -static void -gst_multi_queue_loop (GstPad * pad) +static GstMiniObject * +dequeue_object (GstMultiQueue * mq, GstSingleQueue * sq, guint32 oldid, + guint32 * newid) { - GstSingleQueue *sq; - GstMultiQueueItem *item; GstDataQueueItem *sitem; - GstMultiQueue *mq; + GstMultiQueueItem *item; GstMiniObject *object; - guint32 newid; - guint32 oldid = G_MAXUINT32; - GstFlowReturn result; - sq = (GstSingleQueue *) gst_pad_get_element_private (pad); - mq = sq->mqueue; + GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); - do { - GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); +next: + /* Get something from the queue, blocking until that happens, or we get + * flushed */ + if (!(gst_data_queue_pop (sq->queue, &sitem))) + return NULL; - /* Get something from the queue, blocking until that happens, or we get - * flushed */ - if (!(gst_data_queue_pop (sq->queue, &sitem))) - goto out_flushing; - - item = (GstMultiQueueItem *) sitem; - newid = item->posid; - - /* steal the object and destroy the item */ - object = gst_multi_queue_item_steal_object (item); - gst_multi_queue_item_destroy (item); + item = (GstMultiQueueItem *) sitem; + *newid = item->posid; - GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", - sq->id, newid, oldid); + /* steal the object and destroy the item */ + object = gst_multi_queue_item_steal_object (item); + gst_multi_queue_item_destroy (item); - /* If we're not-linked, we do some extra work because we might need to - * wait before pushing. If we're linked but there's a gap in the IDs, - * or it's the first loop, or we just passed the previous highid, - * we might need to wake some sleeping pad up, so there's extra work - * there too */ - if (sq->srcresult == GST_FLOW_NOT_LINKED || - (oldid == G_MAXUINT32) || (newid != (oldid + 1)) || - oldid > mq->highid) { - GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", + sq->id, *newid, oldid); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); + /* If we're not-linked, we do some extra work because we might need to + * wait before pushing. If we're linked but there's a gap in the IDs, + * or it's the first loop, or we just passed the previous highid, + * we might need to wake some sleeping pad up, so there's extra work + * there too */ + if (sq->srcresult == GST_FLOW_NOT_LINKED || + (oldid == G_MAXUINT32) || (*newid != (oldid + 1)) || oldid > mq->highid) { + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); - /* Update the nextid so other threads know when to wake us up */ - sq->nextid = newid; + GST_MULTI_QUEUE_MUTEX_LOCK (mq); - /* Update the oldid (the last ID we output) for highid tracking */ - if (oldid != G_MAXUINT32) - sq->oldid = oldid; + /* Update the nextid so other threads know when to wake us up */ + sq->nextid = *newid; - if (sq->srcresult == GST_FLOW_NOT_LINKED) { - /* Go to sleep until it's time to push this buffer */ + /* Update the oldid (the last ID we output) for highid tracking */ + if (oldid != G_MAXUINT32) + sq->oldid = oldid; - /* Recompute the highid */ - compute_high_id (mq); - while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { - GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " - "newid %u and highid %u", sq->id, newid, mq->highid); + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + /* Go to sleep until it's time to push this buffer */ + /* Recompute the highid */ + compute_high_id (mq); + while (*newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with " + "newid %u and highid %u", sq->id, *newid, mq->highid); - /* Wake up all non-linked pads before we sleep */ - wake_up_next_non_linked (mq); - mq->numwaiting++; - g_cond_wait (sq->turn, mq->qlock); - mq->numwaiting--; + /* Wake up all non-linked pads before we sleep */ + wake_up_next_non_linked (mq); - GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " - "wakeup with newid %u and highid %u", sq->id, newid, mq->highid); - } + mq->numwaiting++; + g_cond_wait (sq->turn, mq->qlock); + mq->numwaiting--; - /* Re-compute the high_id in case someone else pushed */ - compute_high_id (mq); - } else { - compute_high_id (mq); - /* Wake up all non-linked pads */ - wake_up_next_non_linked (mq); + GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked " + "wakeup with newid %u and highid %u", sq->id, *newid, mq->highid); } - /* We're done waiting, we can clear the nextid */ - sq->nextid = 0; - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + /* Re-compute the high_id in case someone else pushed */ + compute_high_id (mq); + } else { + compute_high_id (mq); + /* Wake up all non-linked pads */ + wake_up_next_non_linked (mq); } + /* We're done waiting, we can clear the nextid */ + sq->nextid = 0; + + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + + return object; +} + +/* Each main loop attempts to push buffers until the return value + * is not-linked. not-linked pads are not allowed to push data beyond + * any linked pads, so they don't 'rush ahead of the pack'. + */ +static void +gst_multi_queue_loop (GstPad * pad) +{ + GstSingleQueue *sq; + GstMultiQueue *mq; + GstMiniObject *object; + guint32 newid; + guint32 oldid = G_MAXUINT32; + GstFlowReturn result; + + sq = (GstSingleQueue *) gst_pad_get_element_private (pad); + mq = sq->mqueue; + + do { + /* take an object */ + if (!(object = dequeue_object (mq, sq, oldid, &newid))) + goto out_flushing; GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", gst_flow_get_name (sq->srcresult)); |