diff options
author | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2012-10-22 10:13:20 +0200 |
---|---|---|
committer | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2012-10-25 14:03:01 +0200 |
commit | 09982c3c13b3271d1c99973b1f6e6e4cb475f1bb (patch) | |
tree | 11e0a6a569bfc7be0422d8d9cfa5d5635441c190 /plugins/elements | |
parent | ebbce545831bc96fa3005b3074dbca9e5adc3eff (diff) |
dataqueue/queuearray: Make public API again
These are actually used outside of coreelements nowadays.
Also hide lots of internals and add padding and documentation.
Diffstat (limited to 'plugins/elements')
-rw-r--r-- | plugins/elements/Makefile.am | 4 | ||||
-rw-r--r-- | plugins/elements/gstdataqueue.c | 640 | ||||
-rw-r--r-- | plugins/elements/gstdataqueue.h | 189 | ||||
-rw-r--r-- | plugins/elements/gstmultiqueue.c | 2 | ||||
-rw-r--r-- | plugins/elements/gstmultiqueue.h | 2 | ||||
-rw-r--r-- | plugins/elements/gstqueue.c | 30 | ||||
-rw-r--r-- | plugins/elements/gstqueue.h | 4 | ||||
-rw-r--r-- | plugins/elements/gstqueuearray.c | 175 | ||||
-rw-r--r-- | plugins/elements/gstqueuearray.h | 61 |
9 files changed, 19 insertions, 1088 deletions
diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 5e0230c6b..8add0bdab 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -15,9 +15,7 @@ libgstcoreelements_la_SOURCES = \ gstidentity.c \ gstinputselector.c \ gstoutputselector.c \ - gstdataqueue.c \ gstmultiqueue.c \ - gstqueuearray.c \ gstqueue.c \ gstqueue2.c \ gsttee.c \ @@ -43,9 +41,7 @@ noinst_HEADERS = \ gstidentity.h \ gstinputselector.h \ gstoutputselector.h \ - gstdataqueue.h \ gstmultiqueue.h \ - gstqueuearray.h \ gstqueue.h \ gstqueue2.h \ gsttee.h \ diff --git a/plugins/elements/gstdataqueue.c b/plugins/elements/gstdataqueue.c deleted file mode 100644 index dff49c6e2..000000000 --- a/plugins/elements/gstdataqueue.c +++ /dev/null @@ -1,640 +0,0 @@ -/* GStreamer - * Copyright (C) 2006 Edward Hervey <edward@fluendo.com> - * - * gstdataqueue.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:gstdataqueue - * @short_description: Threadsafe queueing object - * - * #GstDataQueue is an object that handles threadsafe queueing of objects. It - * also provides size-related functionality. This object should be used for - * any #GstElement that wishes to provide some sort of queueing functionality. - */ - -#include <gst/gst.h> -#include "string.h" -#include "gstdataqueue.h" -#include "gst/glib-compat-private.h" - -GST_DEBUG_CATEGORY_STATIC (data_queue_debug); -#define GST_CAT_DEFAULT (data_queue_debug) -GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow); - - -/* Queue signals and args */ -enum -{ - SIGNAL_EMPTY, - SIGNAL_FULL, - LAST_SIGNAL -}; - -enum -{ - ARG_0, - ARG_CUR_LEVEL_VISIBLE, - ARG_CUR_LEVEL_BYTES, - ARG_CUR_LEVEL_TIME - /* FILL ME */ -}; - -#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ - GST_CAT_LOG (data_queue_dataflow, \ - "locking qlock from thread %p", \ - g_thread_self ()); \ - g_mutex_lock (&q->qlock); \ - GST_CAT_LOG (data_queue_dataflow, \ - "locked qlock from thread %p", \ - g_thread_self ()); \ -} G_STMT_END - -#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \ - GST_DATA_QUEUE_MUTEX_LOCK (q); \ - if (q->flushing) \ - goto label; \ - } G_STMT_END - -#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ - GST_CAT_LOG (data_queue_dataflow, \ - "unlocking qlock from thread %p", \ - g_thread_self ()); \ - g_mutex_unlock (&q->qlock); \ -} G_STMT_END - -#define STATUS(q, msg) \ - GST_CAT_LOG (data_queue_dataflow, \ - "queue:%p " msg ": %u visible items, %u " \ - "bytes, %"G_GUINT64_FORMAT \ - " ns, %u elements", \ - queue, \ - q->cur_level.visible, \ - q->cur_level.bytes, \ - q->cur_level.time, \ - q->queue.length) - -static void gst_data_queue_finalize (GObject * object); - -static void gst_data_queue_set_property (GObject * object, - guint prop_id, const GValue * value, GParamSpec * pspec); -static void gst_data_queue_get_property (GObject * object, - guint prop_id, GValue * value, GParamSpec * pspec); - -static GObjectClass *parent_class = NULL; -static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 }; - -#define _do_init \ -{ \ - GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \ - "data queue object"); \ - GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \ - "dataflow inside the data queue object"); \ -} - - -G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init); - -static void -gst_data_queue_class_init (GstDataQueueClass * klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - parent_class = g_type_class_peek_parent (klass); - - gobject_class->set_property = gst_data_queue_set_property; - gobject_class->get_property = gst_data_queue_get_property; - - /* signals */ - /** - * GstDataQueue::empty: - * @queue: the queue instance - * - * Reports that the queue became empty (empty). - * A queue is empty if the total amount of visible items inside it (num-visible, time, - * size) is lower than the boundary values which can be set through the GObject - * properties. - */ - gst_data_queue_signals[SIGNAL_EMPTY] = - g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, - G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL, - g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - /** - * GstDataQueue::full: - * @queue: the queue instance - * - * Reports that the queue became full (full). - * A queue is full if the total amount of data inside it (num-visible, time, - * size) is higher than the boundary values which can be set through the GObject - * properties. - */ - gst_data_queue_signals[SIGNAL_FULL] = - g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, - G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL, - g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - /* properties */ - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, - g_param_spec_uint ("current-level-bytes", "Current level (kB)", - "Current amount of data in the queue (bytes)", - 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE, - g_param_spec_uint ("current-level-visible", - "Current level (visible items)", - "Current number of visible items in the queue", 0, G_MAXUINT, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, - g_param_spec_uint64 ("current-level-time", "Current level (ns)", - "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - - gobject_class->finalize = gst_data_queue_finalize; -} - -static void -gst_data_queue_init (GstDataQueue * queue) -{ - queue->cur_level.visible = 0; /* no content */ - queue->cur_level.bytes = 0; /* no content */ - queue->cur_level.time = 0; /* no content */ - - queue->checkfull = NULL; - - g_mutex_init (&queue->qlock); - g_cond_init (&queue->item_add); - g_cond_init (&queue->item_del); - gst_queue_array_init (&queue->queue, 50); - - GST_DEBUG ("initialized queue's not_empty & not_full conditions"); -} - -/** - * gst_data_queue_new_full: - * @checkfull: the callback used to tell if the element considers the queue full - * or not. - * @fullcallback: the callback which will be called when the queue is considered full. - * @emptycallback: the callback which will be called when the queue is considered empty. - * @checkdata: a #gpointer that will be given in the @checkfull callback. - * - * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will - * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback - * or @emptycallback. - * - * Returns: a new #GstDataQueue. - */ - -GstDataQueue * -gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull, - GstDataQueueFullCallback fullcallback, - GstDataQueueEmptyCallback emptycallback, gpointer checkdata) -{ - GstDataQueue *ret; - - g_return_val_if_fail (checkfull != NULL, NULL); - - ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL); - ret->checkfull = checkfull; - ret->checkdata = checkdata; - ret->fullcallback = fullcallback; - ret->emptycallback = emptycallback; - - return ret; -} - -/** - * gst_data_queue_new: - * @checkfull: the callback used to tell if the element considers the queue full - * or not. - * @checkdata: a #gpointer that will be given in the @checkfull callback. - * - * Returns: a new #GstDataQueue. - */ - -GstDataQueue * -gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata) -{ - return gst_data_queue_new_full (checkfull, NULL, NULL, checkdata); -} - -static void -gst_data_queue_cleanup (GstDataQueue * queue) -{ - while (!gst_queue_array_is_empty (&queue->queue)) { - GstDataQueueItem *item = gst_queue_array_pop_head (&queue->queue); - - /* Just call the destroy notify on the item */ - item->destroy (item); - } - queue->cur_level.visible = 0; - queue->cur_level.bytes = 0; - queue->cur_level.time = 0; -} - -/* called only once, as opposed to dispose */ -static void -gst_data_queue_finalize (GObject * object) -{ - GstDataQueue *queue = GST_DATA_QUEUE (object); - - GST_DEBUG ("finalizing queue"); - - gst_data_queue_cleanup (queue); - gst_queue_array_clear (&queue->queue); - - GST_DEBUG ("free mutex"); - g_mutex_clear (&queue->qlock); - GST_DEBUG ("done free mutex"); - - g_cond_clear (&queue->item_add); - g_cond_clear (&queue->item_del); - - G_OBJECT_CLASS (parent_class)->finalize (object); -} - -static inline void -gst_data_queue_locked_flush (GstDataQueue * queue) -{ - STATUS (queue, "before flushing"); - gst_data_queue_cleanup (queue); - STATUS (queue, "after flushing"); - /* we deleted something... */ - if (queue->waiting_del) - g_cond_signal (&queue->item_del); -} - -static inline gboolean -gst_data_queue_locked_is_empty (GstDataQueue * queue) -{ - return (queue->queue.length == 0); -} - -static inline gboolean -gst_data_queue_locked_is_full (GstDataQueue * queue) -{ - return queue->checkfull (queue, queue->cur_level.visible, - queue->cur_level.bytes, queue->cur_level.time, queue->checkdata); -} - -/** - * gst_data_queue_flush: - * @queue: a #GstDataQueue. - * - * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and - * #gst_data_queue_pop will be released. - * MT safe. - */ -void -gst_data_queue_flush (GstDataQueue * queue) -{ - GST_DEBUG ("queue:%p", queue); - GST_DATA_QUEUE_MUTEX_LOCK (queue); - gst_data_queue_locked_flush (queue); - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); -} - -/** - * gst_data_queue_is_empty: - * @queue: a #GstDataQueue. - * - * Queries if there are any items in the @queue. - * MT safe. - * - * Returns: #TRUE if @queue is empty. - */ -gboolean -gst_data_queue_is_empty (GstDataQueue * queue) -{ - gboolean res; - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - res = gst_data_queue_locked_is_empty (queue); - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - - return res; -} - -/** - * gst_data_queue_is_full: - * @queue: a #GstDataQueue. - * - * Queries if @queue is full. This check will be done using the - * #GstDataQueueCheckFullFunction registered with @queue. - * MT safe. - * - * Returns: #TRUE if @queue is full. - */ -gboolean -gst_data_queue_is_full (GstDataQueue * queue) -{ - gboolean res; - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - res = gst_data_queue_locked_is_full (queue); - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - - return res; -} - -/** - * gst_data_queue_set_flushing: - * @queue: a #GstDataQueue. - * @flushing: a #gboolean stating if the queue will be flushing or not. - * - * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing - * state, any incoming data on the @queue will be discarded. Any call currently - * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight - * away with a return value of #FALSE. While the @queue is in flushing state, - * all calls to those two functions will return #FALSE. - * - * MT Safe. - */ -void -gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) -{ - GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - queue->flushing = flushing; - if (flushing) { - /* release push/pop functions */ - if (queue->waiting_add) - g_cond_signal (&queue->item_add); - if (queue->waiting_del) - g_cond_signal (&queue->item_del); - } - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); -} - -/** - * gst_data_queue_push: - * @queue: a #GstDataQueue. - * @item: a #GstDataQueueItem. - * - * Pushes a #GstDataQueueItem (or a structure that begins with the same fields) - * on the @queue. If the @queue is full, the call will block until space is - * available, OR the @queue is set to flushing state. - * MT safe. - * - * Note that this function has slightly different semantics than gst_pad_push() - * and gst_pad_push_event(): this function only takes ownership of @item and - * the #GstMiniObject contained in @item if the push was successful. If FALSE - * is returned, the caller is responsible for freeing @item and its contents. - * - * Returns: #TRUE if the @item was successfully pushed on the @queue. - */ -gboolean -gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) -{ - g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); - g_return_val_if_fail (item != NULL, FALSE); - - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); - - STATUS (queue, "before pushing"); - - /* We ALWAYS need to check for queue fillness */ - if (gst_data_queue_locked_is_full (queue)) { - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - if (G_LIKELY (queue->fullcallback)) - queue->fullcallback (queue, queue->checkdata); - else - g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); - - /* signal might have removed some items */ - while (gst_data_queue_locked_is_full (queue)) { - queue->waiting_del = TRUE; - g_cond_wait (&queue->item_del, &queue->qlock); - queue->waiting_del = FALSE; - if (queue->flushing) - goto flushing; - } - } - - gst_queue_array_push_tail (&queue->queue, item); - - if (item->visible) - queue->cur_level.visible++; - queue->cur_level.bytes += item->size; - queue->cur_level.time += item->duration; - - STATUS (queue, "after pushing"); - if (queue->waiting_add) - g_cond_signal (&queue->item_add); - - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - - return TRUE; - - /* ERRORS */ -flushing: - { - GST_DEBUG ("queue:%p, we are flushing", queue); - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - return FALSE; - } -} - -/** - * gst_data_queue_pop: - * @queue: a #GstDataQueue. - * @item: pointer to store the returned #GstDataQueueItem. - * - * Retrieves the first @item available on the @queue. If the queue is currently - * empty, the call will block until at least one item is available, OR the - * @queue is set to the flushing state. - * MT safe. - * - * Returns: #TRUE if an @item was successfully retrieved from the @queue. - */ -gboolean -gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) -{ - g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); - g_return_val_if_fail (item != NULL, FALSE); - - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); - - STATUS (queue, "before popping"); - - if (gst_data_queue_locked_is_empty (queue)) { - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - if (G_LIKELY (queue->emptycallback)) - queue->emptycallback (queue, queue->checkdata); - else - g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); - - while (gst_data_queue_locked_is_empty (queue)) { - queue->waiting_add = TRUE; - g_cond_wait (&queue->item_add, &queue->qlock); - queue->waiting_add = FALSE; - if (queue->flushing) - goto flushing; - } - } - - /* Get the item from the GQueue */ - *item = gst_queue_array_pop_head (&queue->queue); - - /* update current level counter */ - if ((*item)->visible) - queue->cur_level.visible--; - queue->cur_level.bytes -= (*item)->size; - queue->cur_level.time -= (*item)->duration; - - STATUS (queue, "after popping"); - if (queue->waiting_del) - g_cond_signal (&queue->item_del); - - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - - return TRUE; - - /* ERRORS */ -flushing: - { - GST_DEBUG ("queue:%p, we are flushing", queue); - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - return FALSE; - } -} - -static gint -is_of_type (gconstpointer a, gconstpointer b) -{ - return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_INT (b)); -} - -/** - * gst_data_queue_drop_head: - * @queue: The #GstDataQueue to drop an item from. - * @type: The #GType of the item to drop. - * - * Pop and unref the head-most #GstMiniObject with the given #GType. - * - * Returns: TRUE if an element was removed. - */ -gboolean -gst_data_queue_drop_head (GstDataQueue * queue, GType type) -{ - gboolean res = FALSE; - GstDataQueueItem *leak = NULL; - guint idx; - - g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); - - GST_DEBUG ("queue:%p", queue); - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - idx = - gst_queue_array_find (&queue->queue, is_of_type, GINT_TO_POINTER (type)); - - if (idx == -1) - goto done; - - leak = queue->queue.array[idx]; - gst_queue_array_drop_element (&queue->queue, idx); - - if (leak->visible) - queue->cur_level.visible--; - queue->cur_level.bytes -= leak->size; - queue->cur_level.time -= leak->duration; - - leak->destroy (leak); - - res = TRUE; - -done: - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - - GST_DEBUG ("queue:%p , res:%d", queue, res); - - return res; -} - -/** - * gst_data_queue_limits_changed: - * @queue: The #GstDataQueue - * - * Inform the queue that the limits for the fullness check have changed and that - * any blocking gst_data_queue_push() should be unblocked to recheck the limts. - */ -void -gst_data_queue_limits_changed (GstDataQueue * queue) -{ - g_return_if_fail (GST_IS_DATA_QUEUE (queue)); - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - if (queue->waiting_del) { - GST_DEBUG ("signal del"); - g_cond_signal (&queue->item_del); - } - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); -} - -/** - * gst_data_queue_get_level: - * @queue: The #GstDataQueue - * @level: the location to store the result - * - * Get the current level of the queue. - */ -void -gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level) -{ - memcpy (level, (&queue->cur_level), sizeof (GstDataQueueSize)); -} - -static void -gst_data_queue_set_property (GObject * object, - guint prop_id, const GValue * value, GParamSpec * pspec) -{ - switch (prop_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_data_queue_get_property (GObject * object, - guint prop_id, GValue * value, GParamSpec * pspec) -{ - GstDataQueue *queue = GST_DATA_QUEUE (object); - - GST_DATA_QUEUE_MUTEX_LOCK (queue); - - switch (prop_id) { - case ARG_CUR_LEVEL_BYTES: - g_value_set_uint (value, queue->cur_level.bytes); - break; - case ARG_CUR_LEVEL_VISIBLE: - g_value_set_uint (value, queue->cur_level.visible); - break; - case ARG_CUR_LEVEL_TIME: - g_value_set_uint64 (value, queue->cur_level.time); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } - - GST_DATA_QUEUE_MUTEX_UNLOCK (queue); -} diff --git a/plugins/elements/gstdataqueue.h b/plugins/elements/gstdataqueue.h deleted file mode 100644 index 52fdeafa7..000000000 --- a/plugins/elements/gstdataqueue.h +++ /dev/null @@ -1,189 +0,0 @@ -/* GStreamer - * Copyright (C) 2006 Edward Hervey <edward@fluendo.com> - * - * gstdataqueue.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_DATA_QUEUE_H__ -#define __GST_DATA_QUEUE_H__ - -#include <gst/gst.h> -#include "gstqueuearray.h" - -G_BEGIN_DECLS -#define GST_TYPE_DATA_QUEUE \ - (gst_data_queue_get_type()) -#define GST_DATA_QUEUE(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_DATA_QUEUE,GstDataQueue)) -#define GST_DATA_QUEUE_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_DATA_QUEUE,GstDataQueueClass)) -#define GST_IS_DATA_QUEUE(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_DATA_QUEUE)) -#define GST_IS_DATA_QUEUE_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_DATA_QUEUE)) -typedef struct _GstDataQueue GstDataQueue; -typedef struct _GstDataQueueClass GstDataQueueClass; -typedef struct _GstDataQueueSize GstDataQueueSize; -typedef struct _GstDataQueueItem GstDataQueueItem; - -/** - * GstDataQueueItem: - * @object: the #GstMiniObject to queue. - * @size: the size in bytes of the miniobject. - * @duration: the duration in #GstClockTime of the miniobject. Can not be - * #GST_CLOCK_TIME_NONE. - * @visible: #TRUE if @object should be considered as a visible object. - * @destroy: The #GDestroyNotify function to use to free the #GstDataQueueItem. - * This function should also drop the reference to @object the owner of the - * #GstDataQueueItem is assumed to hold. - * - * Structure used by #GstDataQueue. You can supply a different structure, as - * long as the top of the structure is identical to this structure. - */ - -struct _GstDataQueueItem -{ - GstMiniObject *object; - guint size; - guint64 duration; - gboolean visible; - - /* user supplied destroy function */ - GDestroyNotify destroy; -}; - -/** - * GstDataQueueSize: - * @visible: number of buffers - * @bytes: number of bytes - * @time: amount of time - * - * Structure describing the size of a queue. - */ -struct _GstDataQueueSize -{ - guint visible; - guint bytes; - guint64 time; -}; - -/** - * GstDataQueueCheckFullFunction: - * @queue: a #GstDataQueue. - * @visible: The number of visible items currently in the queue. - * @bytes: The amount of bytes currently in the queue. - * @time: The accumulated duration of the items currently in the queue. - * @checkdata: The #gpointer registered when the #GstDataQueue was created. - * - * The prototype of the function used to inform the queue that it should be - * considered as full. - * - * Returns: #TRUE if the queue should be considered full. - */ -typedef gboolean (*GstDataQueueCheckFullFunction) (GstDataQueue * queue, - guint visible, guint bytes, guint64 time, gpointer checkdata); - -typedef void (*GstDataQueueFullCallback) (GstDataQueue * queue, gpointer checkdata); -typedef void (*GstDataQueueEmptyCallback) (GstDataQueue * queue, gpointer checkdata); - -/** - * GstDataQueue: - * @object: the parent structure - * - * Opaque #GstDataQueue structure. - */ -struct _GstDataQueue -{ - GObject object; - - /*< private >*/ - /* the array of data we're keeping our grubby hands on */ - GstQueueArray queue; - - GstDataQueueSize cur_level; /* size of the queue */ - GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */ - gpointer *checkdata; - - GMutex qlock; /* lock for queue (vs object lock) */ - gboolean waiting_add; - GCond item_add; /* signals buffers now available for reading */ - gboolean waiting_del; - GCond item_del; /* signals space now available for writing */ - gboolean flushing; /* indicates whether conditions where signalled because - * of external flushing */ - GstDataQueueFullCallback fullcallback; - GstDataQueueEmptyCallback emptycallback; - - /* gpointer _gst_reserved[GST_PADDING]; */ -}; - -struct _GstDataQueueClass -{ - GObjectClass parent_class; - - /* signals */ - void (*empty) (GstDataQueue * queue); - void (*full) (GstDataQueue * queue); - - /* gpointer _gst_reserved[GST_PADDING]; */ -}; - -G_GNUC_INTERNAL -GType gst_data_queue_get_type (void); - -G_GNUC_INTERNAL -GstDataQueue * gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, - gpointer checkdata) G_GNUC_MALLOC; - -G_GNUC_INTERNAL -GstDataQueue * gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull, - GstDataQueueFullCallback fullcallback, - GstDataQueueEmptyCallback emptycallback, - gpointer checkdata) G_GNUC_MALLOC; - -G_GNUC_INTERNAL -gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); - -G_GNUC_INTERNAL -gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); - -G_GNUC_INTERNAL -void gst_data_queue_flush (GstDataQueue * queue); - -G_GNUC_INTERNAL -void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); - -G_GNUC_INTERNAL -gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); - -G_GNUC_INTERNAL -gboolean gst_data_queue_is_full (GstDataQueue * queue); - -G_GNUC_INTERNAL -gboolean gst_data_queue_is_empty (GstDataQueue * queue); - -G_GNUC_INTERNAL -void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize *level); - -G_GNUC_INTERNAL -void gst_data_queue_limits_changed (GstDataQueue * queue); - -G_END_DECLS - -#endif /* __GST_DATA_QUEUE_H__ */ diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 8afcebb4a..22686ab88 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -1983,7 +1983,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->mqueue = mqueue; sq->srcresult = GST_FLOW_FLUSHING; sq->pushed = FALSE; - sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) + sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) single_queue_check_full, (GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq); diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index 986dc22af..8053ef96e 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -24,7 +24,7 @@ #define __GST_MULTI_QUEUE_H__ #include <gst/gst.h> -#include "gstdataqueue.h" +#include <gst/base/gstdataqueue.h> G_BEGIN_DECLS diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 29898230b..8b06fe1d6 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -92,7 +92,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_dataflow); queue->cur_level.time, \ queue->min_threshold.time, \ queue->max_size.time, \ - queue->queue.length) + gst_queue_array_get_length (queue->queue)) /* Queue signals and args */ enum @@ -417,7 +417,7 @@ gst_queue_init (GstQueue * queue) g_cond_init (&queue->item_add); g_cond_init (&queue->item_del); - gst_queue_array_init (&queue->queue, DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); + queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); queue->sinktime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE; @@ -440,13 +440,13 @@ gst_queue_finalize (GObject * object) GST_DEBUG_OBJECT (queue, "finalizing queue"); - while (!gst_queue_array_is_empty (&queue->queue)) { - data = gst_queue_array_pop_head (&queue->queue); + while (!gst_queue_array_is_empty (queue->queue)) { + data = gst_queue_array_pop_head (queue->queue); /* FIXME: if it's a query, shouldn't we unref that too? */ if (!GST_IS_QUERY (data)) gst_mini_object_unref (data); } - gst_queue_array_clear (&queue->queue); + gst_queue_array_free (queue->queue); g_mutex_clear (&queue->qlock); g_cond_clear (&queue->item_add); @@ -556,8 +556,8 @@ gst_queue_locked_flush (GstQueue * queue) { GstMiniObject *data; - while (!gst_queue_array_is_empty (&queue->queue)) { - data = gst_queue_array_pop_head (&queue->queue); + while (!gst_queue_array_is_empty (queue->queue)) { + data = gst_queue_array_pop_head (queue->queue); /* Then lose another reference because we are supposed to destroy that data when flushing */ if (!GST_IS_QUERY (data)) @@ -590,7 +590,7 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE); if (item) - gst_queue_array_push_tail (&queue->queue, item); + gst_queue_array_push_tail (queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -611,7 +611,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->sink_segment, TRUE); /* if the queue is empty, apply sink segment on the source */ - if (queue->queue.length == 0) { + if (gst_queue_array_is_empty (queue->queue)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad"); apply_segment (queue, event, &queue->src_segment, FALSE); queue->newseg_applied_to_src = TRUE; @@ -625,7 +625,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) } if (item) - gst_queue_array_push_tail (&queue->queue, item); + gst_queue_array_push_tail (queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -635,7 +635,7 @@ gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; - item = gst_queue_array_pop_head (&queue->queue); + item = gst_queue_array_pop_head (queue->queue); if (item == NULL) goto no_item; @@ -792,9 +792,9 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, GST_QUERY_TYPE_NAME (query)); - gst_queue_array_push_tail (&queue->queue, query); + gst_queue_array_push_tail (queue->queue, query); GST_QUEUE_SIGNAL_ADD (queue); - while (queue->queue.length != 0) { + while (!gst_queue_array_is_empty (queue->queue)) { /* for as long as the queue has items, we know the query is * not handled yet */ GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); @@ -822,14 +822,14 @@ gst_queue_is_empty (GstQueue * queue) { GstMiniObject *head; - if (queue->queue.length == 0) + if (gst_queue_array_is_empty (queue->queue)) return TRUE; /* Only consider the queue empty if the minimum thresholds * are not reached and data is at the queue head. Otherwise * we would block forever on serialized queries. */ - head = queue->queue.array[queue->queue.head]; + head = gst_queue_array_peek_head (queue->queue); if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head)) return FALSE; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 1ed123e4f..a262d6190 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -25,7 +25,7 @@ #define __GST_QUEUE_H__ #include <gst/gst.h> -#include "gstqueuearray.h" +#include <gst/base/gstqueuearray.h> G_BEGIN_DECLS @@ -108,7 +108,7 @@ struct _GstQueue { gboolean eos; /* the queue of data we're keeping our grubby hands on */ - GstQueueArray queue; + GstQueueArray *queue; GstQueueSize cur_level, /* currently in the queue */ diff --git a/plugins/elements/gstqueuearray.c b/plugins/elements/gstqueuearray.c deleted file mode 100644 index f16f7ae01..000000000 --- a/plugins/elements/gstqueuearray.c +++ /dev/null @@ -1,175 +0,0 @@ -/* GStreamer - * Copyright (C) 2009 Edward Hervey <bilboed@bilboed.com> - * - * gstqueuearray.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#include <string.h> -#include <gst/gst.h> -#include "gstqueuearray.h" - -void -gst_queue_array_init (GstQueueArray * array, guint initial_size) -{ - array->size = initial_size; - array->array = g_new0 (gpointer, initial_size); - array->head = 0; - array->tail = 0; - array->length = 0; - -} - -GstQueueArray * -gst_queue_array_new (guint initial_size) -{ - GstQueueArray *array; - - array = g_new (GstQueueArray, 1); - gst_queue_array_init (array, initial_size); - return array; -} - -gpointer -gst_queue_array_pop_head (GstQueueArray * array) -{ - gpointer ret; - - /* empty array */ - if (G_UNLIKELY (array->length == 0)) - return NULL; - ret = array->array[array->head]; - array->head++; - array->head %= array->size; - array->length--; - return ret; -} - -void -gst_queue_array_push_tail (GstQueueArray * array, gpointer data) -{ - /* Check if we need to make room */ - if (G_UNLIKELY (array->length == array->size)) { - /* newsize is 50% bigger */ - guint newsize = (3 * array->size) / 2; - - /* copy over data */ - if (array->tail != 0) { - gpointer *array2 = g_new0 (gpointer, newsize); - guint t1 = array->head; - guint t2 = array->size - array->head; - - /* [0-----TAIL][HEAD------SIZE] - * - * We want to end up with - * [HEAD------------------TAIL][----FREEDATA------NEWSIZE] - * - * 1) move [HEAD-----SIZE] part to beginning of new array - * 2) move [0-------TAIL] part new array, after previous part - */ - - memcpy (array2, &array->array[array->head], t2 * sizeof (gpointer)); - memcpy (&array2[t2], array->array, t1 * sizeof (gpointer)); - - g_free (array->array); - array->array = array2; - array->head = 0; - } else { - /* Fast path, we just need to grow the array */ - array->array = g_renew (gpointer, array->array, newsize); - } - array->tail = array->size; - array->size = newsize; - } - - array->array[array->tail] = data; - array->tail++; - array->tail %= array->size; - array->length++; -} - -gboolean -gst_queue_array_is_empty (GstQueueArray * array) -{ - return (array->length == 0); -} - -void -gst_queue_array_clear (GstQueueArray * array) -{ - g_free (array->array); -} - -void -gst_queue_array_free (GstQueueArray * array) -{ - gst_queue_array_clear (array); - g_free (array); -} - -void -gst_queue_array_drop_element (GstQueueArray * array, guint idx) -{ - if (idx == array->head) { - /* just move the head */ - array->head++; - array->head %= array->size; - return; - } - if (idx == array->tail - 1) { - /* just move the tail */ - array->tail = (array->tail - 1 + array->size) % array->size; - return; - } - /* drop the element #idx... and readjust the array */ - if (array->head < array->tail) { - /* Make sure it's within the boundaries */ - g_assert (array->head < idx && idx <= array->tail); - /* ends not wrapped */ - /* move head-idx to head+1 */ - memcpy (&array->array[array->head + 1], - &array->array[array->head], (idx - array->head) * sizeof (gpointer)); - array->tail--; - } else { - /* ends are wrapped */ - if (idx < array->tail) { - /* move idx-tail backwards one */ - memcpy (&array->array[idx - 1], - &array->array[idx], (array->tail - idx) * sizeof (gpointer)); - array->tail--; - } else if (idx >= array->head) { - /* move head-idx forwards one */ - memcpy (&array->array[array->head], - &array->array[array->head + 1], - (idx - array->head) * sizeof (gpointer)); - array->head++; - } else - g_assert_not_reached (); - } -} - -guint -gst_queue_array_find (GstQueueArray * array, GCompareFunc func, gpointer data) -{ - guint i; - - /* Scan from head to tail */ - for (i = array->head; i < array->length; i = (i + 1) % array->size) - if (func (array->array[i], data) == 0) - return i; - return -1; -} diff --git a/plugins/elements/gstqueuearray.h b/plugins/elements/gstqueuearray.h deleted file mode 100644 index 510cbf47d..000000000 --- a/plugins/elements/gstqueuearray.h +++ /dev/null @@ -1,61 +0,0 @@ -/* GStreamer - * Copyright (C) 2009-2010 Edward Hervey <bilboed@bilboed.com> - * - * gstqueuearray.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#include <glib.h> - -#ifndef __GST_QUEUE_ARRAY_H__ -#define __GST_QUEUE_ARRAY_H__ - -typedef struct _GstQueueArray GstQueueArray; - -struct _GstQueueArray -{ - gpointer *array; - guint size; - guint head; - guint tail; - guint length; -}; - -G_GNUC_INTERNAL void gst_queue_array_init (GstQueueArray * array, - guint initial_size); - -G_GNUC_INTERNAL void gst_queue_array_clear (GstQueueArray * array); - -G_GNUC_INTERNAL GstQueueArray * gst_queue_array_new (guint initial_size); - -G_GNUC_INTERNAL gpointer gst_queue_array_pop_head (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_push_tail (GstQueueArray * array, - gpointer data); - -G_GNUC_INTERNAL gboolean gst_queue_array_is_empty (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_free (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_drop_element (GstQueueArray * array, - guint idx); - -G_GNUC_INTERNAL guint gst_queue_array_find (GstQueueArray * array, - GCompareFunc func, - gpointer data); - -#endif |