summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim-Philipp Müller <tim@centricular.com>2015-02-16 22:39:42 +0000
committerTim-Philipp Müller <tim@centricular.com>2015-02-17 09:52:09 +0000
commitda7847d1adde088b4d8bcfc76642cf942e735dd3 (patch)
treecb9f399b3e8882e2c5e998912cc739f828f7f097
parent9f58aa080a41759462ff5771cb259ee64a7ea51e (diff)
message, bus: fix async message delivery
Async message delivery (where the posting thread gets blocked until the message has been processed and/or freed) was pretty much completely broken. For one, don't use GMutex implementation details to check whether a mutex has been initialized or not, esp. not implementation details that don't hold true any more with newer GLib versions where atomic ops and futexes are used (spotted by Josep Torras). This led to async message delivery no longer blocking with newer GLib versions on Linux. Secondly, after async delivery don't free mutex/GCond embedded inside the just-freed message structure. Use a new (private) mini object flag to signal GstMessage that the message being freed is part of an async delivery on the bus so that the dispose handler can keep the message alive and the bus can free it once it's done cleaning up stuff.
-rw-r--r--gst/gst_private.h3
-rw-r--r--gst/gstbus.c12
-rw-r--r--gst/gstmessage.c29
3 files changed, 37 insertions, 7 deletions
diff --git a/gst/gst_private.h b/gst/gst_private.h
index 8c9cb4bbe..d0aefb4a5 100644
--- a/gst/gst_private.h
+++ b/gst/gst_private.h
@@ -426,5 +426,8 @@ struct _GstDeviceProviderFactoryClass {
gpointer _gst_reserved[GST_PADDING];
};
+/* privat flag used by GstBus / GstMessage */
+#define GST_MESSAGE_FLAG_ASYNC_DELIVERY (GST_MINI_OBJECT_FLAG_LAST << 0)
+
G_END_DECLS
#endif /* __GST_PRIVATE_H__ */
diff --git a/gst/gstbus.c b/gst/gstbus.c
index ff458cd54..13b05462a 100644
--- a/gst/gstbus.c
+++ b/gst/gstbus.c
@@ -308,6 +308,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message,
message);
+ /* check we didn't accidentally add a public flag that maps to same value */
+ g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message,
+ GST_MESSAGE_FLAG_ASYNC_DELIVERY));
+
GST_OBJECT_LOCK (bus);
/* check if the bus is flushing */
if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
@@ -357,6 +361,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
g_cond_init (cond);
g_mutex_init (lock);
+ GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
+
GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
/* now we lock the message mutex, send the message to the async
@@ -369,12 +375,18 @@ gst_bus_post (GstBus * bus, GstMessage * message)
/* now block till the message is freed */
g_cond_wait (cond, lock);
+
+ /* we acquired a new ref from gst_message_dispose() so we can clean up */
g_mutex_unlock (lock);
GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
+ GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
+
g_mutex_clear (lock);
g_cond_clear (cond);
+
+ gst_message_unref (message);
break;
}
default:
diff --git a/gst/gstmessage.c b/gst/gstmessage.c
index 8d3892c12..9f4d286c0 100644
--- a/gst/gstmessage.c
+++ b/gst/gstmessage.c
@@ -166,6 +166,26 @@ gst_message_type_to_quark (GstMessageType type)
return 0;
}
+static gboolean
+_gst_message_dispose (GstMessage * message)
+{
+ gboolean do_free = TRUE;
+
+ if (GST_MINI_OBJECT_FLAG_IS_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY)) {
+ GST_INFO ("[msg %p] signalling async free", message);
+
+ GST_MESSAGE_LOCK (message);
+ GST_MESSAGE_SIGNAL (message);
+ GST_MESSAGE_UNLOCK (message);
+
+ /* don't free it yet, let bus finish with it first */
+ gst_message_ref (message);
+ do_free = FALSE;
+ }
+
+ return do_free;
+}
+
static void
_gst_message_free (GstMessage * message)
{
@@ -181,12 +201,6 @@ _gst_message_free (GstMessage * message)
GST_MESSAGE_SRC (message) = NULL;
}
- if (message->lock.p) {
- GST_MESSAGE_LOCK (message);
- GST_MESSAGE_SIGNAL (message);
- GST_MESSAGE_UNLOCK (message);
- }
-
structure = GST_MESSAGE_STRUCTURE (message);
if (structure) {
gst_structure_set_parent_refcount (structure, NULL);
@@ -235,7 +249,8 @@ gst_message_init (GstMessageImpl * message, GstMessageType type,
GstObject * src)
{
gst_mini_object_init (GST_MINI_OBJECT_CAST (message), 0, _gst_message_type,
- (GstMiniObjectCopyFunction) _gst_message_copy, NULL,
+ (GstMiniObjectCopyFunction) _gst_message_copy,
+ (GstMiniObjectDisposeFunction) _gst_message_dispose,
(GstMiniObjectFreeFunction) _gst_message_free);
GST_MESSAGE_TYPE (message) = type;