diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-10-28 13:27:43 +0100 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-11-03 18:51:54 +0100 |
commit | 1edcec637c5d1f82921b179e06d79102bf44095c (patch) | |
tree | 5462cb70dcc178aab98ff850f5c1f2f4c90b2daa | |
parent | 19b72a3f6bd272db4a34aebf9caa7e2a0879db17 (diff) |
bus: make the bus almost lockfreelockfree-bus
Use new GstPoll functionality to wakeup the mainloop.
Use a lockfree queue on the writer side to post the messages.
The reader side it protected with the lock still because we don't want multiple
concurrent readers.
-rw-r--r-- | gst/gstbus.c | 147 | ||||
-rw-r--r-- | gst/gstbus.h | 3 |
2 files changed, 49 insertions, 101 deletions
diff --git a/gst/gstbus.c b/gst/gstbus.c index 2639d0e84..94c2b6396 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -75,6 +75,7 @@ #include <sys/types.h> #include "gstinfo.h" +#include "gstpoll.h" #include "gstbus.h" @@ -90,8 +91,6 @@ enum static void gst_bus_dispose (GObject * object); -static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx); - static GstObjectClass *parent_class = NULL; static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; @@ -100,7 +99,9 @@ struct _GstBusPrivate guint num_sync_message_emitters; GCond *queue_cond; GSource *watch_id; - GMainContext *main_context; + + GstPoll *poll; + GPollFD pollfd; }; G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); @@ -184,12 +185,15 @@ gst_bus_class_init (GstBusClass * klass) static void gst_bus_init (GstBus * bus) { - bus->queue = g_queue_new (); + bus->queue = gst_lf_queue_new (50); bus->queue_lock = g_mutex_new (); bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); bus->priv->queue_cond = g_cond_new (); + bus->priv->poll = gst_poll_new_timer (); + gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd); + GST_DEBUG_OBJECT (bus, "created"); } @@ -203,63 +207,24 @@ gst_bus_dispose (GObject * object) g_mutex_lock (bus->queue_lock); do { - message = g_queue_pop_head (bus->queue); + message = gst_lf_queue_pop (bus->queue); if (message) gst_message_unref (message); } while (message != NULL); - g_queue_free (bus->queue); + gst_lf_queue_free (bus->queue); bus->queue = NULL; g_mutex_unlock (bus->queue_lock); g_mutex_free (bus->queue_lock); bus->queue_lock = NULL; g_cond_free (bus->priv->queue_cond); bus->priv->queue_cond = NULL; - } - if (bus->priv->main_context) { - g_main_context_unref (bus->priv->main_context); - bus->priv->main_context = NULL; + gst_poll_free (bus->priv->poll); } G_OBJECT_CLASS (parent_class)->dispose (object); } -static void -gst_bus_wakeup_main_context (GstBus * bus) -{ - GMainContext *ctx; - - GST_OBJECT_LOCK (bus); - if ((ctx = bus->priv->main_context)) - g_main_context_ref (ctx); - GST_OBJECT_UNLOCK (bus); - - g_main_context_wakeup (ctx); - - if (ctx) - g_main_context_unref (ctx); -} - -static void -gst_bus_set_main_context (GstBus * bus, GMainContext * ctx) -{ - GST_OBJECT_LOCK (bus); - - if (bus->priv->main_context != NULL) { - g_main_context_unref (bus->priv->main_context); - bus->priv->main_context = NULL; - } - - if (ctx != NULL) { - bus->priv->main_context = g_main_context_ref (ctx); - } - - GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p", - ctx, g_main_context_default ()); - - GST_OBJECT_UNLOCK (bus); -} - /** * gst_bus_new: * @@ -335,14 +300,10 @@ gst_bus_post (GstBus * bus, GstMessage * message) case GST_BUS_PASS: /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); - g_mutex_lock (bus->queue_lock); - g_queue_push_tail (bus->queue, message); - g_cond_broadcast (bus->priv->queue_cond); - g_mutex_unlock (bus->queue_lock); + gst_lf_queue_push (bus->queue, message); + gst_poll_write_control (bus->priv->poll); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); - gst_bus_wakeup_main_context (bus); - break; case GST_BUS_ASYNC: { @@ -360,12 +321,9 @@ gst_bus_post (GstBus * bus, GstMessage * message) * queue. When the message is handled by the app and destroyed, * the cond will be signalled and we can continue */ g_mutex_lock (lock); - g_mutex_lock (bus->queue_lock); - g_queue_push_tail (bus->queue, message); - g_cond_broadcast (bus->priv->queue_cond); - g_mutex_unlock (bus->queue_lock); - gst_bus_wakeup_main_context (bus); + gst_lf_queue_push (bus->queue, message); + gst_poll_write_control (bus->priv->poll); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -413,10 +371,8 @@ gst_bus_have_pending (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), FALSE); - g_mutex_lock (bus->queue_lock); /* see if there is a message on the bus */ - result = !g_queue_is_empty (bus->queue); - g_mutex_unlock (bus->queue_lock); + result = gst_lf_queue_length (bus->queue) != 0; return result; } @@ -482,7 +438,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, GstMessageType types) { GstMessage *message; - GTimeVal *timeval, abstimeout; + GTimeVal now, then; gboolean first_round = TRUE; g_return_val_if_fail (GST_IS_BUS (bus), NULL); @@ -491,9 +447,12 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, g_mutex_lock (bus->queue_lock); while (TRUE) { - GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue)); + gint ret; + + GST_LOG_OBJECT (bus, "have %d messages", gst_lf_queue_length (bus->queue)); - while ((message = g_queue_pop_head (bus->queue))) { + while ((message = gst_lf_queue_pop (bus->queue))) { + gst_poll_read_control (bus->priv->poll); GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u", message, GST_MESSAGE_TYPE_NAME (message), (guint) types); if ((GST_MESSAGE_TYPE (message) & types) != 0) { @@ -510,28 +469,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, if (timeout == 0) break; - if (timeout == GST_CLOCK_TIME_NONE) { - /* wait forever */ - timeval = NULL; - } else if (first_round) { - glong add = timeout / 1000; - - if (add == 0) - /* no need to wait */ - break; - - /* make timeout absolute */ - g_get_current_time (&abstimeout); - g_time_val_add (&abstimeout, add); - timeval = &abstimeout; - first_round = FALSE; - GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add); - } else { - /* calculated the absolute end time already, no need to do it again */ - GST_DEBUG_OBJECT (bus, "blocking for message, again"); - timeval = &abstimeout; /* fool compiler */ + else if (timeout != GST_CLOCK_TIME_NONE) { + if (first_round) { + g_get_current_time (&then); + first_round = FALSE; + } else { + GstClockTime elapsed; + + g_get_current_time (&now); + + elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then); + if (timeout > elapsed) + timeout -= elapsed; + else + timeout = 0; + } } - if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) { + + g_mutex_unlock (bus->queue_lock); + ret = gst_poll_wait (bus->priv->poll, timeout); + g_mutex_lock (bus->queue_lock); + + if (ret == 0) { GST_INFO_OBJECT (bus, "timed out, breaking loop"); break; } else { @@ -643,7 +602,7 @@ gst_bus_peek (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_mutex_lock (bus->queue_lock); - message = g_queue_peek_head (bus->queue); + message = gst_lf_queue_peek (bus->queue); if (message) gst_message_ref (message); g_mutex_unlock (bus->queue_lock); @@ -701,24 +660,13 @@ typedef struct { GSource source; GstBus *bus; - gboolean inited; } GstBusSource; static gboolean gst_bus_source_prepare (GSource * source, gint * timeout) { - GstBusSource *bsrc = (GstBusSource *) source; - - /* we do this here now that we know that we're attached to a main context - * (we don't support detaching a source from a main context and then - * re-attaching it to a different main context) */ - if (G_UNLIKELY (!bsrc->inited)) { - gst_bus_set_main_context (bsrc->bus, g_source_get_context (source)); - bsrc->inited = TRUE; - } - *timeout = -1; - return gst_bus_have_pending (bsrc->bus); + return FALSE; } static gboolean @@ -726,7 +674,7 @@ gst_bus_source_check (GSource * source) { GstBusSource *bsrc = (GstBusSource *) source; - return gst_bus_have_pending (bsrc->bus); + return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR); } static gboolean @@ -788,7 +736,6 @@ gst_bus_source_finalize (GSource * source) bus->priv->watch_id = NULL; GST_OBJECT_UNLOCK (bus); - gst_bus_set_main_context (bsource->bus, NULL); gst_object_unref (bsource->bus); bsource->bus = NULL; } @@ -820,7 +767,7 @@ gst_bus_create_watch (GstBus * bus) source = (GstBusSource *) g_source_new (&gst_bus_source_funcs, sizeof (GstBusSource)); source->bus = gst_object_ref (bus); - source->inited = FALSE; + g_source_add_poll ((GSource *) source, &bus->priv->pollfd); return (GSource *) source; } diff --git a/gst/gstbus.h b/gst/gstbus.h index 30afe6006..92a27c9f1 100644 --- a/gst/gstbus.h +++ b/gst/gstbus.h @@ -28,6 +28,7 @@ typedef struct _GstBusClass GstBusClass; #include <gst/gstmessage.h> #include <gst/gstclock.h> +#include <gst/gstlfqueue.h> G_BEGIN_DECLS @@ -115,7 +116,7 @@ struct _GstBus GstObject object; /*< private >*/ - GQueue *queue; + GstLFQueue *queue; GMutex *queue_lock; GstBusSyncHandler sync_handler; |