summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2010-10-28 13:27:43 +0100
committerWim Taymans <wim.taymans@collabora.co.uk>2010-11-03 18:51:54 +0100
commit1edcec637c5d1f82921b179e06d79102bf44095c (patch)
tree5462cb70dcc178aab98ff850f5c1f2f4c90b2daa
parent19b72a3f6bd272db4a34aebf9caa7e2a0879db17 (diff)
bus: make the bus almost lockfreelockfree-bus
Use new GstPoll functionality to wakeup the mainloop. Use a lockfree queue on the writer side to post the messages. The reader side it protected with the lock still because we don't want multiple concurrent readers.
-rw-r--r--gst/gstbus.c147
-rw-r--r--gst/gstbus.h3
2 files changed, 49 insertions, 101 deletions
diff --git a/gst/gstbus.c b/gst/gstbus.c
index 2639d0e84..94c2b6396 100644
--- a/gst/gstbus.c
+++ b/gst/gstbus.c
@@ -75,6 +75,7 @@
#include <sys/types.h>
#include "gstinfo.h"
+#include "gstpoll.h"
#include "gstbus.h"
@@ -90,8 +91,6 @@ enum
static void gst_bus_dispose (GObject * object);
-static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
-
static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
@@ -100,7 +99,9 @@ struct _GstBusPrivate
guint num_sync_message_emitters;
GCond *queue_cond;
GSource *watch_id;
- GMainContext *main_context;
+
+ GstPoll *poll;
+ GPollFD pollfd;
};
G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
@@ -184,12 +185,15 @@ gst_bus_class_init (GstBusClass * klass)
static void
gst_bus_init (GstBus * bus)
{
- bus->queue = g_queue_new ();
+ bus->queue = gst_lf_queue_new (50);
bus->queue_lock = g_mutex_new ();
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
bus->priv->queue_cond = g_cond_new ();
+ bus->priv->poll = gst_poll_new_timer ();
+ gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
+
GST_DEBUG_OBJECT (bus, "created");
}
@@ -203,63 +207,24 @@ gst_bus_dispose (GObject * object)
g_mutex_lock (bus->queue_lock);
do {
- message = g_queue_pop_head (bus->queue);
+ message = gst_lf_queue_pop (bus->queue);
if (message)
gst_message_unref (message);
} while (message != NULL);
- g_queue_free (bus->queue);
+ gst_lf_queue_free (bus->queue);
bus->queue = NULL;
g_mutex_unlock (bus->queue_lock);
g_mutex_free (bus->queue_lock);
bus->queue_lock = NULL;
g_cond_free (bus->priv->queue_cond);
bus->priv->queue_cond = NULL;
- }
- if (bus->priv->main_context) {
- g_main_context_unref (bus->priv->main_context);
- bus->priv->main_context = NULL;
+ gst_poll_free (bus->priv->poll);
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
-static void
-gst_bus_wakeup_main_context (GstBus * bus)
-{
- GMainContext *ctx;
-
- GST_OBJECT_LOCK (bus);
- if ((ctx = bus->priv->main_context))
- g_main_context_ref (ctx);
- GST_OBJECT_UNLOCK (bus);
-
- g_main_context_wakeup (ctx);
-
- if (ctx)
- g_main_context_unref (ctx);
-}
-
-static void
-gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
-{
- GST_OBJECT_LOCK (bus);
-
- if (bus->priv->main_context != NULL) {
- g_main_context_unref (bus->priv->main_context);
- bus->priv->main_context = NULL;
- }
-
- if (ctx != NULL) {
- bus->priv->main_context = g_main_context_ref (ctx);
- }
-
- GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
- ctx, g_main_context_default ());
-
- GST_OBJECT_UNLOCK (bus);
-}
-
/**
* gst_bus_new:
*
@@ -335,14 +300,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
case GST_BUS_PASS:
/* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
- g_mutex_lock (bus->queue_lock);
- g_queue_push_tail (bus->queue, message);
- g_cond_broadcast (bus->priv->queue_cond);
- g_mutex_unlock (bus->queue_lock);
+ gst_lf_queue_push (bus->queue, message);
+ gst_poll_write_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
- gst_bus_wakeup_main_context (bus);
-
break;
case GST_BUS_ASYNC:
{
@@ -360,12 +321,9 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* queue. When the message is handled by the app and destroyed,
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
- g_mutex_lock (bus->queue_lock);
- g_queue_push_tail (bus->queue, message);
- g_cond_broadcast (bus->priv->queue_cond);
- g_mutex_unlock (bus->queue_lock);
- gst_bus_wakeup_main_context (bus);
+ gst_lf_queue_push (bus->queue, message);
+ gst_poll_write_control (bus->priv->poll);
/* now block till the message is freed */
g_cond_wait (cond, lock);
@@ -413,10 +371,8 @@ gst_bus_have_pending (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
- g_mutex_lock (bus->queue_lock);
/* see if there is a message on the bus */
- result = !g_queue_is_empty (bus->queue);
- g_mutex_unlock (bus->queue_lock);
+ result = gst_lf_queue_length (bus->queue) != 0;
return result;
}
@@ -482,7 +438,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
GstMessageType types)
{
GstMessage *message;
- GTimeVal *timeval, abstimeout;
+ GTimeVal now, then;
gboolean first_round = TRUE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
@@ -491,9 +447,12 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
g_mutex_lock (bus->queue_lock);
while (TRUE) {
- GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
+ gint ret;
+
+ GST_LOG_OBJECT (bus, "have %d messages", gst_lf_queue_length (bus->queue));
- while ((message = g_queue_pop_head (bus->queue))) {
+ while ((message = gst_lf_queue_pop (bus->queue))) {
+ gst_poll_read_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
@@ -510,28 +469,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
if (timeout == 0)
break;
- if (timeout == GST_CLOCK_TIME_NONE) {
- /* wait forever */
- timeval = NULL;
- } else if (first_round) {
- glong add = timeout / 1000;
-
- if (add == 0)
- /* no need to wait */
- break;
-
- /* make timeout absolute */
- g_get_current_time (&abstimeout);
- g_time_val_add (&abstimeout, add);
- timeval = &abstimeout;
- first_round = FALSE;
- GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
- } else {
- /* calculated the absolute end time already, no need to do it again */
- GST_DEBUG_OBJECT (bus, "blocking for message, again");
- timeval = &abstimeout; /* fool compiler */
+ else if (timeout != GST_CLOCK_TIME_NONE) {
+ if (first_round) {
+ g_get_current_time (&then);
+ first_round = FALSE;
+ } else {
+ GstClockTime elapsed;
+
+ g_get_current_time (&now);
+
+ elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
+ if (timeout > elapsed)
+ timeout -= elapsed;
+ else
+ timeout = 0;
+ }
}
- if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
+
+ g_mutex_unlock (bus->queue_lock);
+ ret = gst_poll_wait (bus->priv->poll, timeout);
+ g_mutex_lock (bus->queue_lock);
+
+ if (ret == 0) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
@@ -643,7 +602,7 @@ gst_bus_peek (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
- message = g_queue_peek_head (bus->queue);
+ message = gst_lf_queue_peek (bus->queue);
if (message)
gst_message_ref (message);
g_mutex_unlock (bus->queue_lock);
@@ -701,24 +660,13 @@ typedef struct
{
GSource source;
GstBus *bus;
- gboolean inited;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
- GstBusSource *bsrc = (GstBusSource *) source;
-
- /* we do this here now that we know that we're attached to a main context
- * (we don't support detaching a source from a main context and then
- * re-attaching it to a different main context) */
- if (G_UNLIKELY (!bsrc->inited)) {
- gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
- bsrc->inited = TRUE;
- }
-
*timeout = -1;
- return gst_bus_have_pending (bsrc->bus);
+ return FALSE;
}
static gboolean
@@ -726,7 +674,7 @@ gst_bus_source_check (GSource * source)
{
GstBusSource *bsrc = (GstBusSource *) source;
- return gst_bus_have_pending (bsrc->bus);
+ return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
}
static gboolean
@@ -788,7 +736,6 @@ gst_bus_source_finalize (GSource * source)
bus->priv->watch_id = NULL;
GST_OBJECT_UNLOCK (bus);
- gst_bus_set_main_context (bsource->bus, NULL);
gst_object_unref (bsource->bus);
bsource->bus = NULL;
}
@@ -820,7 +767,7 @@ gst_bus_create_watch (GstBus * bus)
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
source->bus = gst_object_ref (bus);
- source->inited = FALSE;
+ g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
return (GSource *) source;
}
diff --git a/gst/gstbus.h b/gst/gstbus.h
index 30afe6006..92a27c9f1 100644
--- a/gst/gstbus.h
+++ b/gst/gstbus.h
@@ -28,6 +28,7 @@ typedef struct _GstBusClass GstBusClass;
#include <gst/gstmessage.h>
#include <gst/gstclock.h>
+#include <gst/gstlfqueue.h>
G_BEGIN_DECLS
@@ -115,7 +116,7 @@ struct _GstBus
GstObject object;
/*< private >*/
- GQueue *queue;
+ GstLFQueue *queue;
GMutex *queue_lock;
GstBusSyncHandler sync_handler;