diff options
author | Tim-Philipp Müller <tim@centricular.com> | 2015-02-16 22:39:42 +0000 |
---|---|---|
committer | Tim-Philipp Müller <tim@centricular.com> | 2015-02-17 09:52:09 +0000 |
commit | da7847d1adde088b4d8bcfc76642cf942e735dd3 (patch) | |
tree | cb9f399b3e8882e2c5e998912cc739f828f7f097 | |
parent | 9f58aa080a41759462ff5771cb259ee64a7ea51e (diff) |
message, bus: fix async message delivery
Async message delivery (where the posting thread gets blocked
until the message has been processed and/or freed) was pretty
much completely broken.
For one, don't use GMutex implementation details to check
whether a mutex has been initialized or not, esp. not
implementation details that don't hold true any more with
newer GLib versions where atomic ops and futexes are used
(spotted by Josep Torras). This led to async message
delivery no longer blocking with newer GLib versions on
Linux.
Secondly, after async delivery don't free mutex/GCond
embedded inside the just-freed message structure.
Use a new (private) mini object flag to signal GstMessage
that the message being freed is part of an async delivery
on the bus so that the dispose handler can keep the message
alive and the bus can free it once it's done cleaning up
stuff.
-rw-r--r-- | gst/gst_private.h | 3 | ||||
-rw-r--r-- | gst/gstbus.c | 12 | ||||
-rw-r--r-- | gst/gstmessage.c | 29 |
3 files changed, 37 insertions, 7 deletions
diff --git a/gst/gst_private.h b/gst/gst_private.h index 8c9cb4bbe..d0aefb4a5 100644 --- a/gst/gst_private.h +++ b/gst/gst_private.h @@ -426,5 +426,8 @@ struct _GstDeviceProviderFactoryClass { gpointer _gst_reserved[GST_PADDING]; }; +/* privat flag used by GstBus / GstMessage */ +#define GST_MESSAGE_FLAG_ASYNC_DELIVERY (GST_MINI_OBJECT_FLAG_LAST << 0) + G_END_DECLS #endif /* __GST_PRIVATE_H__ */ diff --git a/gst/gstbus.c b/gst/gstbus.c index ff458cd54..13b05462a 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -308,6 +308,10 @@ gst_bus_post (GstBus * bus, GstMessage * message) GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message, message); + /* check we didn't accidentally add a public flag that maps to same value */ + g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message, + GST_MESSAGE_FLAG_ASYNC_DELIVERY)); + GST_OBJECT_LOCK (bus); /* check if the bus is flushing */ if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) @@ -357,6 +361,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) g_cond_init (cond); g_mutex_init (lock); + GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY); + GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message); /* now we lock the message mutex, send the message to the async @@ -369,12 +375,18 @@ gst_bus_post (GstBus * bus, GstMessage * message) /* now block till the message is freed */ g_cond_wait (cond, lock); + + /* we acquired a new ref from gst_message_dispose() so we can clean up */ g_mutex_unlock (lock); GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message); + GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY); + g_mutex_clear (lock); g_cond_clear (cond); + + gst_message_unref (message); break; } default: diff --git a/gst/gstmessage.c b/gst/gstmessage.c index 8d3892c12..9f4d286c0 100644 --- a/gst/gstmessage.c +++ b/gst/gstmessage.c @@ -166,6 +166,26 @@ gst_message_type_to_quark (GstMessageType type) return 0; } +static gboolean +_gst_message_dispose (GstMessage * message) +{ + gboolean do_free = TRUE; + + if (GST_MINI_OBJECT_FLAG_IS_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY)) { + GST_INFO ("[msg %p] signalling async free", message); + + GST_MESSAGE_LOCK (message); + GST_MESSAGE_SIGNAL (message); + GST_MESSAGE_UNLOCK (message); + + /* don't free it yet, let bus finish with it first */ + gst_message_ref (message); + do_free = FALSE; + } + + return do_free; +} + static void _gst_message_free (GstMessage * message) { @@ -181,12 +201,6 @@ _gst_message_free (GstMessage * message) GST_MESSAGE_SRC (message) = NULL; } - if (message->lock.p) { - GST_MESSAGE_LOCK (message); - GST_MESSAGE_SIGNAL (message); - GST_MESSAGE_UNLOCK (message); - } - structure = GST_MESSAGE_STRUCTURE (message); if (structure) { gst_structure_set_parent_refcount (structure, NULL); @@ -235,7 +249,8 @@ gst_message_init (GstMessageImpl * message, GstMessageType type, GstObject * src) { gst_mini_object_init (GST_MINI_OBJECT_CAST (message), 0, _gst_message_type, - (GstMiniObjectCopyFunction) _gst_message_copy, NULL, + (GstMiniObjectCopyFunction) _gst_message_copy, + (GstMiniObjectDisposeFunction) _gst_message_dispose, (GstMiniObjectFreeFunction) _gst_message_free); GST_MESSAGE_TYPE (message) = type; |