diff options
author | David Zeuthen <davidz@redhat.com> | 2010-07-07 15:00:23 -0400 |
---|---|---|
committer | David Zeuthen <davidz@redhat.com> | 2010-07-07 15:03:03 -0400 |
commit | 62a1ccf526e7b23ac39cdf7251eac5706eef3f57 (patch) | |
tree | 613b12bc60a3f145a5495862fa2b7075b5f2b1c3 | |
parent | d4f35ae9ed230ccc04b8f50067a0d0dda95b7d5f (diff) |
Bug 618882 – No way to ensure that a message is sent
Add g_dbus_connection_flush{_finish,sync}().
https://bugzilla.gnome.org/show_bug.cgi?id=618882
Signed-off-by: David Zeuthen <davidz@redhat.com>
-rw-r--r-- | docs/reference/gio/gio-sections.txt | 3 | ||||
-rw-r--r-- | gio/gdbusconnection.c | 157 | ||||
-rw-r--r-- | gio/gdbusconnection.h | 14 | ||||
-rw-r--r-- | gio/gdbusprivate.c | 81 | ||||
-rw-r--r-- | gio/gdbusprivate.h | 5 | ||||
-rw-r--r-- | gio/gio.symbols | 3 | ||||
-rw-r--r-- | gio/tests/Makefile.am | 4 | ||||
-rw-r--r-- | gio/tests/gdbus-connection-flush-helper.c | 60 | ||||
-rw-r--r-- | gio/tests/gdbus-connection.c | 82 |
9 files changed, 406 insertions, 3 deletions
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 4e10c7fae..617b89318 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -2396,6 +2396,9 @@ g_dbus_connection_start_message_processing GDBusCapabilityFlags g_dbus_connection_close g_dbus_connection_is_closed +g_dbus_connection_flush +g_dbus_connection_flush_finish +g_dbus_connection_flush_sync g_dbus_connection_get_exit_on_close g_dbus_connection_set_exit_on_close g_dbus_connection_get_stream diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c index 9b94d1d95..1fa49c617 100644 --- a/gio/gdbusconnection.c +++ b/gio/gdbusconnection.c @@ -882,6 +882,151 @@ g_dbus_connection_get_capabilities (GDBusConnection *connection) return connection->priv->capabilities; } +/* ---------------------------------------------------------------------------------------------------- */ + +static void +flush_in_thread_func (GSimpleAsyncResult *res, + GObject *object, + GCancellable *cancellable) +{ + GError *error; + + error = NULL; + if (!g_dbus_connection_flush_sync (G_DBUS_CONNECTION (object), + cancellable, + &error)) + { + g_simple_async_result_set_from_error (res, error); + g_error_free (error); + } +} + +/** + * g_dbus_connection_flush: + * @connection: A #GDBusConnection. + * @cancellable: A #GCancellable or %NULL. + * @callback: A #GAsyncReadyCallback to call when the request is satisfied or %NULL if you don't + * care about the result. + * @user_data: The data to pass to @callback. + * + * Asynchronously flushes @connection, that is, writes all queued + * outgoing message to the transport and then flushes the transport + * (using g_output_stream_flush_async()). This is useful in programs + * that wants to emit a D-Bus signal and then exit + * immediately. Without flushing the connection, there is no guarantee + * that the message has been sent to the networking buffers in the OS + * kernel. + * + * This is an asynchronous method. When the operation is finished, + * @callback will be invoked in the <link + * linkend="g-main-context-push-thread-default">thread-default main + * loop</link> of the thread you are calling this method from. You can + * then call g_dbus_connection_flush_finish() to get the result of the + * operation. See g_dbus_connection_flush_sync() for the synchronous + * version. + * + * Since: 2.26 + */ +void +g_dbus_connection_flush (GDBusConnection *connection, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + + g_return_if_fail (G_IS_DBUS_CONNECTION (connection)); + + simple = g_simple_async_result_new (NULL, + callback, + user_data, + g_dbus_connection_flush); + g_simple_async_result_run_in_thread (simple, + flush_in_thread_func, + G_PRIORITY_DEFAULT, + cancellable); + g_object_unref (simple); +} + +/** + * g_dbus_connection_flush_finish: + * @connection: A #GDBusConnection. + * @res: A #GAsyncResult obtained from the #GAsyncReadyCallback passed to g_dbus_connection_flush(). + * @error: Return location for error or %NULL. + * + * Finishes an operation started with g_dbus_connection_flush(). + * + * Returns: %TRUE if the operation succeeded, %FALSE if @error is set. + * + * Since: 2.26 + */ +gboolean +g_dbus_connection_flush_finish (GDBusConnection *connection, + GAsyncResult *res, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (res); + gboolean ret; + + ret = FALSE; + + g_return_val_if_fail (G_IS_DBUS_CONNECTION (connection), FALSE); + g_return_val_if_fail (G_IS_ASYNC_RESULT (res), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_dbus_connection_flush); + + if (g_simple_async_result_propagate_error (simple, error)) + goto out; + + ret = TRUE; + + out: + return ret; +} + +/** + * g_dbus_connection_flush_sync: + * @connection: A #GDBusConnection. + * @cancellable: A #GCancellable or %NULL. + * @error: Return location for error or %NULL. + * + * Synchronously flushes @connection. The calling thread is blocked + * until this is done. See g_dbus_connection_flush() for the + * asynchronous version of this method and more details about what it + * does. + * + * Returns: %TRUE if the operation succeeded, %FALSE if @error is set. + * + * Since: 2.26 + */ +gboolean +g_dbus_connection_flush_sync (GDBusConnection *connection, + GCancellable *cancellable, + GError **error) +{ + gboolean ret; + + g_return_val_if_fail (G_IS_DBUS_CONNECTION (connection), FALSE); + + ret = FALSE; + + if (connection->priv->closed) + { + g_set_error_literal (error, + G_IO_ERROR, + G_IO_ERROR_CLOSED, + _("The connection is closed")); + goto out; + } + + ret = _g_dbus_worker_flush_sync (connection->priv->worker, + cancellable, + error); + + out: + return ret; +} /* ---------------------------------------------------------------------------------------------------- */ @@ -955,7 +1100,14 @@ set_closed_unlocked (GDBusConnection *connection, * * Closes @connection. Note that this never causes the process to * exit (this might only happen if the other end of a shared message - * bus connection disconnects). + * bus connection disconnects, see #GDBusConnection:exit-on-close). + * + * Once the stream is closed, all operations will return + * %G_IO_ERROR_CLOSED. + * + * Note that closing a connection will not automatically flush the + * connection so queued messages may be lost. Use + * g_dbus_connection_flush() if you need such guarantees. * * If @connection is already closed, this method does nothing. * @@ -1091,8 +1243,7 @@ g_dbus_connection_send_message_unlocked (GDBusConnection *connection, * submitting the message to the underlying transport. * * If @connection is closed then the operation will fail with - * %G_IO_ERROR_CLOSED. If @cancellable is canceled, the operation will - * fail with %G_IO_ERROR_CANCELLED. If @message is not well-formed, + * %G_IO_ERROR_CLOSED. If @message is not well-formed, * the operation fails with %G_IO_ERROR_INVALID_ARGUMENT. * * See <xref linkend="gdbus-server"/> and <xref diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h index 36c03c657..f422ccc2b 100644 --- a/gio/gdbusconnection.h +++ b/gio/gdbusconnection.h @@ -139,6 +139,20 @@ gboolean g_dbus_connection_get_exit_on_close (GDBusConnection void g_dbus_connection_set_exit_on_close (GDBusConnection *connection, gboolean exit_on_close); GDBusCapabilityFlags g_dbus_connection_get_capabilities (GDBusConnection *connection); + +/* ---------------------------------------------------------------------------------------------------- */ + +void g_dbus_connection_flush (GDBusConnection *connection, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gboolean g_dbus_connection_flush_finish (GDBusConnection *connection, + GAsyncResult *res, + GError **error); +gboolean g_dbus_connection_flush_sync (GDBusConnection *connection, + GCancellable *cancellable, + GError **error); + /* ---------------------------------------------------------------------------------------------------- */ gboolean g_dbus_connection_send_message (GDBusConnection *connection, diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index bcc031e84..819aa3400 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -388,8 +388,17 @@ struct GDBusWorker GMutex *write_lock; GQueue *write_queue; gboolean write_is_pending; + guint64 write_num_messages_written; + GList *write_pending_flushes; }; +typedef struct +{ + GMutex *mutex; + GCond *cond; + guint64 number_to_wait_for; +} FlushData; + struct _MessageToWriteData ; typedef struct _MessageToWriteData MessageToWriteData; @@ -407,6 +416,8 @@ _g_dbus_worker_unref (GDBusWorker *worker) { if (g_atomic_int_dec_and_test (&worker->ref_count)) { + g_assert (worker->write_pending_flushes == NULL); + _g_dbus_shared_thread_unref (); g_object_unref (worker->stream); @@ -815,6 +826,8 @@ write_message (GDBusWorker *worker, GError **error) { gboolean ret; + GList *l; + GList *ll; g_return_val_if_fail (data->blob_size > 16, FALSE); @@ -908,6 +921,24 @@ write_message (GDBusWorker *worker, ret = TRUE; + /* wake up pending flushes */ + g_mutex_lock (worker->write_lock); + worker->write_num_messages_written += 1; + for (l = worker->write_pending_flushes; l != NULL; l = ll) + { + FlushData *f = l->data; + ll = l->next; + + if (f->number_to_wait_for == worker->write_num_messages_written) + { + g_mutex_lock (f->mutex); + g_cond_signal (f->cond); + g_mutex_unlock (f->mutex); + worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); + } + } + g_mutex_unlock (worker->write_lock); + if (G_UNLIKELY (_g_dbus_debug_message ())) { gchar *s; @@ -1072,6 +1103,8 @@ _g_dbus_worker_new (GIOStream *stream, return worker; } +/* ---------------------------------------------------------------------------------------------------- */ + /* This can be called from any thread - frees worker - guarantees no callbacks * will ever be issued again */ @@ -1092,6 +1125,54 @@ _g_dbus_worker_stop (GDBusWorker *worker) _g_dbus_worker_unref (worker); } +/* ---------------------------------------------------------------------------------------------------- */ + +/* can be called from any thread (except the worker thread) - blocks + * calling thread until all queued outgoing messages are written and + * the transport has been flushed + */ +gboolean +_g_dbus_worker_flush_sync (GDBusWorker *worker, + GCancellable *cancellable, + GError **error) +{ + gboolean ret; + FlushData *data; + + data = NULL; + + /* if the queue is empty, there's nothing to wait for */ + g_mutex_lock (worker->write_lock); + if (g_queue_get_length (worker->write_queue) > 0) + { + data = g_new0 (FlushData, 1); + data->mutex = g_mutex_new (); + data->cond = g_cond_new (); + data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue); + g_mutex_lock (data->mutex); + worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data); + } + g_mutex_unlock (worker->write_lock); + + if (data != NULL) + { + g_cond_wait (data->cond, data->mutex); + g_mutex_unlock (data->mutex); + + /* note:the element is removed from worker->write_pending_flushes in write_message() */ + g_cond_free (data->cond); + g_mutex_free (data->mutex); + g_free (data); + } + + ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream), + cancellable, + error); + return ret; +} + +/* ---------------------------------------------------------------------------------------------------- */ + #define G_DBUS_DEBUG_AUTHENTICATION (1<<0) #define G_DBUS_DEBUG_MESSAGE (1<<1) #define G_DBUS_DEBUG_PAYLOAD (1<<2) diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h index 91a78c494..a879fab13 100644 --- a/gio/gdbusprivate.h +++ b/gio/gdbusprivate.h @@ -71,6 +71,11 @@ void _g_dbus_worker_stop (GDBusWorker *worker); /* can be called from any thread */ void _g_dbus_worker_unfreeze (GDBusWorker *worker); +/* can be called from any thread (except the worker thread) */ +gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, + GCancellable *cancellable, + GError **error); + /* ---------------------------------------------------------------------------------------------------- */ void _g_dbus_initialize (void); diff --git a/gio/gio.symbols b/gio/gio.symbols index 8f43a5407..abe883574 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -1541,6 +1541,9 @@ g_dbus_connection_get_unique_name g_dbus_connection_is_closed g_dbus_connection_set_exit_on_close g_dbus_connection_close +g_dbus_connection_flush +g_dbus_connection_flush_finish +g_dbus_connection_flush_sync g_dbus_connection_emit_signal g_dbus_connection_call g_dbus_connection_call_finish diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am index 8886f1b3e..b724fe2d2 100644 --- a/gio/tests/Makefile.am +++ b/gio/tests/Makefile.am @@ -81,6 +81,7 @@ SAMPLE_PROGS = \ gdbus-example-subtree \ gdbus-example-peer \ gdbus-example-proxy-subclass \ + gdbus-connection-flush-helper \ testapp \ appinfo-test \ $(NULL) @@ -264,6 +265,9 @@ gdbus_example_proxy_subclass_LDADD = $(progs_ldadd) gdbus_example_export_SOURCES = gdbus-example-export.c gdbus_example_export_LDADD = $(progs_ldadd) +gdbus_connection_flush_helper_SOURCES = gdbus-connection-flush-helper.c +gdbus_connection_flush_helper_LDADD = $(progs_ldadd) + application_SOURCES = application.c gdbus-sessionbus.c gdbus-sessionbus.h application_LDADD = $(progs_ldadd) diff --git a/gio/tests/gdbus-connection-flush-helper.c b/gio/tests/gdbus-connection-flush-helper.c new file mode 100644 index 000000000..5e3607578 --- /dev/null +++ b/gio/tests/gdbus-connection-flush-helper.c @@ -0,0 +1,60 @@ +/* GLib testing framework examples and tests + * + * Copyright (C) 2008-2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: David Zeuthen <davidz@redhat.com> + */ + +#include <gio/gio.h> + +int +main (int argc, + char *argv[]) +{ + GDBusConnection *c; + GError *error; + gboolean ret; + + g_type_init (); + + error = NULL; + c = g_bus_get_sync (G_BUS_TYPE_SESSION, + NULL, /* GCancellable* */ + &error); + g_assert_no_error (error); + + error = NULL; + g_dbus_connection_emit_signal (c, + NULL, /* const gchar *destination_bus_name */ + "/org/gtk/GDBus/FlushObject", + "org.gtk.GDBus.FlushInterface", + "SomeSignal", + NULL, /* GVariant *parameters */ + &error); + g_assert_no_error (error); + + error = NULL; + ret = g_dbus_connection_flush_sync (c, + NULL, /* GCancellable* */ + &error); + g_assert_no_error (error); + g_assert (ret); + + /* and now exit immediately! */ + return 0; +} diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c index 3c63caebb..cd6aa0d2b 100644 --- a/gio/tests/gdbus-connection.c +++ b/gio/tests/gdbus-connection.c @@ -24,6 +24,9 @@ #include <unistd.h> #include <string.h> +#include <sys/types.h> +#include <sys/wait.h> + #include "gdbus-tests.h" /* all tests rely on a shared mainloop */ @@ -661,6 +664,84 @@ test_connection_filter (void) /* ---------------------------------------------------------------------------------------------------- */ +static void +test_connection_flush_signal_handler (GDBusConnection *connection, + const gchar *sender_name, + const gchar *object_path, + const gchar *interface_name, + const gchar *signal_name, + GVariant *parameters, + gpointer user_data) +{ + g_main_loop_quit (loop); +} + +static gboolean +test_connection_flush_on_timeout (gpointer user_data) +{ + guint iteration = GPOINTER_TO_UINT (user_data); + g_printerr ("Timeout waiting 1000 msec on iteration %d\n", iteration); + g_assert_not_reached (); + return FALSE; +} + +static void +test_connection_flush (void) +{ + GDBusConnection *connection; + GError *error; + guint n; + guint signal_handler_id; + + session_bus_up (); + + error = NULL; + connection = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error); + g_assert_no_error (error); + g_assert (connection != NULL); + + signal_handler_id = g_dbus_connection_signal_subscribe (connection, + NULL, /* sender */ + "org.gtk.GDBus.FlushInterface", + "SomeSignal", + "/org/gtk/GDBus/FlushObject", + NULL, + test_connection_flush_signal_handler, + NULL, + NULL); + g_assert_cmpint (signal_handler_id, !=, 0); + + for (n = 0; n < 50; n++) + { + gboolean ret; + gint exit_status; + guint timeout_mainloop_id; + + error = NULL; + ret = g_spawn_command_line_sync ("./gdbus-connection-flush-helper", + NULL, /* stdout */ + NULL, /* stderr */ + &exit_status, + &error); + g_assert_no_error (error); + g_assert (WIFEXITED (exit_status)); + g_assert_cmpint (WEXITSTATUS (exit_status), ==, 0); + g_assert (ret); + + timeout_mainloop_id = g_timeout_add (1000, test_connection_flush_on_timeout, GUINT_TO_POINTER (n)); + g_main_loop_run (loop); + g_source_remove (timeout_mainloop_id); + } + + g_dbus_connection_signal_unsubscribe (connection, signal_handler_id); + _g_object_wait_for_single_ref (connection); + g_object_unref (connection); + + session_bus_down (); +} + +/* ---------------------------------------------------------------------------------------------------- */ + int main (int argc, char *argv[]) @@ -681,5 +762,6 @@ main (int argc, g_test_add_func ("/gdbus/connection-send", test_connection_send); g_test_add_func ("/gdbus/connection-signals", test_connection_signals); g_test_add_func ("/gdbus/connection-filter", test_connection_filter); + g_test_add_func ("/gdbus/connection-flush", test_connection_flush); return g_test_run(); } |