diff options
author | David Zeuthen <davidz@redhat.com> | 2010-04-27 15:50:31 -0400 |
---|---|---|
committer | David Zeuthen <davidz@redhat.com> | 2010-04-27 15:50:31 -0400 |
commit | df990c08190b5b7d575d8b5fcaf0875cafd2061b (patch) | |
tree | 85ca6c84087e5f3776a8c2ca71a8173f8060ebd5 | |
parent | de82c0c9aa4e5150d065c1bc0b90e1413b537c1b (diff) |
Move writing to our dedicated worker thread
-rw-r--r-- | gdbus/gdbusconnection.c | 135 | ||||
-rw-r--r-- | gdbus/gdbusprivate.c | 290 | ||||
-rw-r--r-- | gdbus/gdbusprivate.h | 7 |
3 files changed, 264 insertions, 168 deletions
diff --git a/gdbus/gdbusconnection.c b/gdbus/gdbusconnection.c index 3fca1f1..8a32dc9 100644 --- a/gdbus/gdbusconnection.c +++ b/gdbus/gdbusconnection.c @@ -815,123 +815,6 @@ g_dbus_connection_close (GDBusConnection *connection) /* ---------------------------------------------------------------------------------------------------- */ -/* caller must hold lock */ -static gboolean -g_dbus_connection_write (GDBusConnection *connection, - const gchar *blob, - gssize blob_size, - GUnixFDList *fd_list, - GError **error) -{ - gboolean ret; - - ret = FALSE; - - if (connection->priv->closed) - { - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_CLOSED, - _("The connection is closed")); - goto out; - } - - /* TODO: do this in private thread */ - - if (blob_size == -1) - blob_size = strlen (blob); - - g_assert (blob_size > 16); - - /* First, the initial 16 bytes - special case UNIX sockets here - * since it may involve writing an ancillary message - */ -#ifdef G_OS_UNIX - { - GOutputVector vector; - GSocketControlMessage *message; - GSocket *socket; - gssize bytes_written; - - message = NULL; - if (fd_list != NULL) - { - if (!G_IS_UNIX_CONNECTION (connection->priv->stream)) - { - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor on unsupported stream of type %s", - g_type_name (G_TYPE_FROM_INSTANCE (connection->priv->stream))); - goto out; - } - else if (!(connection->priv->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) - { - g_set_error_literal (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor but remote peer does not support this capability"); - goto out; - } - message = g_unix_fd_message_new_with_fd_list (fd_list); - } - - socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (connection->priv->stream)); - vector.buffer = blob; - vector.size = 16; - - bytes_written = g_socket_send_message (socket, - NULL, /* address */ - &vector, - 1, - message != NULL ? &message : NULL, - message != NULL ? 1 : 0, - G_SOCKET_MSG_NONE, - NULL, /* cancellable */ - error); - if (bytes_written == -1) - { - g_prefix_error (error, _("Error writing file descriptors to socket: ")); - g_object_unref (message); - goto out; - } - if (bytes_written < 16) - { - /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary - * messages are sent? - */ - g_assert_not_reached (); - } - if (message != NULL) - g_object_unref (message); - } -#else - if (!g_output_stream_write_all (g_io_stream_get_output_stream (connection->priv->stream), - (const gchar *) blob, - 16, - NULL, /* bytes_written */ - NULL, /* cancellable */ - error)) - goto out; -#endif - - /* Then write the rest of the message */ - if (!g_output_stream_write_all (g_io_stream_get_output_stream (connection->priv->stream), - (const gchar *) blob + 16, - blob_size - 16, - NULL, /* bytes_written */ - NULL, /* cancellable */ - error)) - goto out; - - ret = TRUE; - - out: - return ret; -} - -/* ---------------------------------------------------------------------------------------------------- */ - static gboolean g_dbus_connection_send_message_unlocked (GDBusConnection *connection, GDBusMessage *message, @@ -986,21 +869,16 @@ g_dbus_connection_send_message_unlocked (GDBusConnection *connection, g_printerr ("----\n"); #endif - /* TODO: use connection->priv->auth to encode the message */ + /* TODO: use connection->priv->auth to encode the blob */ if (out_serial != NULL) *out_serial = serial_to_use; - if (!g_dbus_connection_write (connection, - (const gchar *) blob, - blob_size, -#ifdef G_OS_UNIX - g_dbus_message_get_unix_fd_list (message), -#else - NULL, -#endif - error)) - goto out; + _g_dbus_worker_send_message (connection->priv->worker, + message, + (gchar*) blob, + blob_size); + blob = NULL; /* since _g_dbus_worker_send_message() steals the blob */ ret = TRUE; @@ -1636,6 +1514,7 @@ initable_init (GInitable *initable, } connection->priv->worker = _g_dbus_worker_new (connection->priv->stream, + connection->priv->capabilities, on_worker_message_received, on_worker_closed, connection); diff --git a/gdbus/gdbusprivate.c b/gdbus/gdbusprivate.c index 0723a9f..fad7b4d 100644 --- a/gdbus/gdbusprivate.c +++ b/gdbus/gdbusprivate.c @@ -301,8 +301,8 @@ struct GDBusWorker { volatile gint ref_count; gboolean stopped; - GMutex *lock; GIOStream *stream; + GDBusCapabilityFlags capabilities; GCancellable *cancellable; GDBusWorkerMessageReceivedCallback message_received_callback; GDBusWorkerDisconnectedCallback disconnected_callback; @@ -310,20 +310,30 @@ struct GDBusWorker GThread *thread; - /* used for reading */ - gchar *read_buffer; - gsize read_buffer_allocated_size; - gsize read_buffer_cur_size; - gsize read_buffer_bytes_wanted; - GUnixFDList *fd_list; - - GSocketControlMessage **read_ancillary_messages; - gint read_num_ancillary_messages; - /* if not NULL, stream is GSocketConnection */ GSocket *socket; + + /* used for reading */ + GMutex *read_lock; + gchar *read_buffer; + gsize read_buffer_allocated_size; + gsize read_buffer_cur_size; + gsize read_buffer_bytes_wanted; + GUnixFDList *read_fd_list; + GSocketControlMessage **read_ancillary_messages; + gint read_num_ancillary_messages; + + /* used for writing */ + GMutex *write_lock; + GQueue *write_queue; + gboolean write_is_pending; }; +struct _MessageToWriteData ; +typedef struct _MessageToWriteData MessageToWriteData; + +static void message_to_write_data_free (MessageToWriteData *data); + static GDBusWorker * _g_dbus_worker_ref (GDBusWorker *worker) { @@ -339,10 +349,17 @@ _g_dbus_worker_unref (GDBusWorker *worker) _g_dbus_shared_thread_unref (); g_object_unref (worker->stream); - g_mutex_free (worker->lock); + + g_mutex_free (worker->read_lock); g_object_unref (worker->cancellable); - if (worker->fd_list != NULL) - g_object_unref (worker->fd_list); + if (worker->read_fd_list != NULL) + g_object_unref (worker->read_fd_list); + + g_mutex_free (worker->write_lock); + g_queue_foreach (worker->write_queue, + (GFunc) message_to_write_data_free, + NULL); + g_queue_free (worker->write_queue); g_free (worker); } } @@ -366,7 +383,7 @@ _g_dbus_worker_emit_message (GDBusWorker *worker, static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); -/* called in private thread shared by all GDBusConnection instances (without lock held) */ +/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ static void _g_dbus_worker_do_read_cb (GInputStream *input_stream, GAsyncResult *res, @@ -376,7 +393,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, GError *error; gssize bytes_read; - g_mutex_lock (worker->lock); + g_mutex_lock (worker->read_lock); /* If already stopped, don't even process the reply */ if (worker->stopped) @@ -411,9 +428,9 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, fd_message = G_UNIX_FD_MESSAGE (control_message); fds = g_unix_fd_message_steal_fds (fd_message, &num_fds); - if (worker->fd_list == NULL) + if (worker->read_fd_list == NULL) { - worker->fd_list = g_unix_fd_list_new_from_array (fds, num_fds); + worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds); } else { @@ -421,7 +438,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, for (n = 0; n < num_fds; n++) { /* TODO: really want a append_steal() */ - g_unix_fd_list_append (worker->fd_list, fds[n], NULL); + g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL); close (fds[n]); } } @@ -522,10 +539,10 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; } - if (worker->fd_list != NULL) + if (worker->read_fd_list != NULL) { - g_dbus_message_set_unix_fd_list (message, worker->fd_list); - worker->fd_list = NULL; + g_dbus_message_set_unix_fd_list (message, worker->read_fd_list); + worker->read_fd_list = NULL; } /* yay, got a message, go deliver it */ @@ -545,13 +562,13 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, } out: - g_mutex_unlock (worker->lock); + g_mutex_unlock (worker->read_lock); /* gives up the reference acquired when calling g_input_stream_read_async() */ _g_dbus_worker_unref (worker); } -/* called in private thread shared by all GDBusConnection instances (with lock held) */ +/* called in private thread shared by all GDBusConnection instances (with read-lock held) */ static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) { @@ -594,15 +611,214 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) } } -/* called in private thread shared by all GDBusConnection instances (without lock held) */ +/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ static void _g_dbus_worker_do_read (GDBusWorker *worker) { - g_mutex_lock (worker->lock); + g_mutex_lock (worker->read_lock); _g_dbus_worker_do_read_unlocked (worker); - g_mutex_unlock (worker->lock); + g_mutex_unlock (worker->read_lock); } +/* ---------------------------------------------------------------------------------------------------- */ + +struct _MessageToWriteData +{ + GDBusMessage *message; + gchar *blob; + gsize blob_size; +}; + +static void +message_to_write_data_free (MessageToWriteData *data) +{ + g_object_unref (data->message); + g_free (data->blob); + g_free (data); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ +static gboolean +write_message (GDBusWorker *worker, + MessageToWriteData *data, + GError **error) +{ + gboolean ret; + + g_assert (data->blob_size > 16); + + ret = FALSE; + + /* First, the initial 16 bytes - special case UNIX sockets here + * since it may involve writing an ancillary message with file + * descriptors + */ +#ifdef G_OS_UNIX + { + GOutputVector vector; + GSocketControlMessage *message; + GUnixFDList *fd_list; + gssize bytes_written; + + fd_list = g_dbus_message_get_unix_fd_list (data->message); + + message = NULL; + if (fd_list != NULL) + { + if (!G_IS_UNIX_CONNECTION (worker->stream)) + { + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_INVALID_ARGUMENT, + "Tried sending a file descriptor on unsupported stream of type %s", + g_type_name (G_TYPE_FROM_INSTANCE (worker->stream))); + goto out; + } + else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) + { + g_set_error_literal (error, + G_IO_ERROR, + G_IO_ERROR_INVALID_ARGUMENT, + "Tried sending a file descriptor but remote peer does not support this capability"); + goto out; + } + message = g_unix_fd_message_new_with_fd_list (fd_list); + } + + vector.buffer = data->blob; + vector.size = 16; + + bytes_written = g_socket_send_message (worker->socket, + NULL, /* address */ + &vector, + 1, + message != NULL ? &message : NULL, + message != NULL ? 1 : 0, + G_SOCKET_MSG_NONE, + worker->cancellable, + error); + if (bytes_written == -1) + { + g_prefix_error (error, _("Error writing first 16 bytes of message to socket: ")); + if (message != NULL) + g_object_unref (message); + goto out; + } + if (message != NULL) + g_object_unref (message); + + if (bytes_written < 16) + { + /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary + * messages are sent? + */ + g_assert_not_reached (); + } + } +#else + /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */ + if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), + (const gchar *) data->blob, + 16, + NULL, /* bytes_written */ + worker->cancellable, /* cancellable */ + error)) + goto out; +#endif + + /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */ + if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), + (const gchar *) data->blob + 16, + data->blob_size - 16, + NULL, /* bytes_written */ + worker->cancellable, /* cancellable */ + error)) + goto out; + + ret = TRUE; + + out: + return ret; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static gboolean +write_message_in_idle_cb (gpointer user_data) +{ + GDBusWorker *worker = user_data; + gboolean more_writes_are_pending; + MessageToWriteData *data; + GError *error; + + g_mutex_lock (worker->write_lock); + + data = g_queue_pop_head (worker->write_queue); + g_assert (data != NULL); + + error = NULL; + if (!write_message (worker, + data, + &error)) + { + /* TODO: handle */ + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + } + message_to_write_data_free (data); + + more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); + + worker->write_is_pending = more_writes_are_pending; + g_mutex_unlock (worker->write_lock); + + return more_writes_are_pending; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* can be called from any thread - steals blob */ +void +_g_dbus_worker_send_message (GDBusWorker *worker, + GDBusMessage *message, + gchar *blob, + gsize blob_len) +{ + MessageToWriteData *data; + + g_return_if_fail (G_IS_DBUS_MESSAGE (message)); + g_return_if_fail (blob != NULL); + g_return_if_fail (blob_len > 16); + + data = g_new0 (MessageToWriteData, 1); + data->message = g_object_ref (message); + data->blob = blob; /* steal! */ + data->blob_size = blob_len; + + g_mutex_lock (worker->write_lock); + g_queue_push_tail (worker->write_queue, data); + if (!worker->write_is_pending) + { + GSource *idle_source; + + worker->write_is_pending = TRUE; + + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + write_message_in_idle_cb, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_attach (idle_source, shared_thread_data->context); + g_source_unref (idle_source); + } + g_mutex_unlock (worker->write_lock); +} + +/* ---------------------------------------------------------------------------------------------------- */ static void _g_dbus_worker_thread_begin_func (gpointer user_data) @@ -617,6 +833,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data) GDBusWorker * _g_dbus_worker_new (GIOStream *stream, + GDBusCapabilityFlags capabilities, GDBusWorkerMessageReceivedCallback message_received_callback, GDBusWorkerDisconnectedCallback disconnected_callback, gpointer user_data) @@ -629,13 +846,18 @@ _g_dbus_worker_new (GIOStream *stream, worker = g_new0 (GDBusWorker, 1); worker->ref_count = 1; - worker->lock = g_mutex_new (); + + worker->read_lock = g_mutex_new (); worker->message_received_callback = message_received_callback; worker->disconnected_callback = disconnected_callback; worker->user_data = user_data; worker->stream = g_object_ref (stream); + worker->capabilities = capabilities; worker->cancellable = g_cancellable_new (); + worker->write_lock = g_mutex_new (); + worker->write_queue = g_queue_new (); + if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); @@ -644,14 +866,6 @@ _g_dbus_worker_new (GIOStream *stream, return worker; } -void -_g_dbus_worker_send_message (GDBusWorker *worker, - GDBusMessage *message) -{ - g_assert_not_reached (); -} - - /* This can be called from any thread - frees worker - guarantees no callbacks * will ever be issued again */ @@ -663,10 +877,10 @@ _g_dbus_worker_stop (GDBusWorker *worker) * we're already holding the lock... */ if (g_thread_self () != worker->thread) - g_mutex_lock (worker->lock); + g_mutex_lock (worker->read_lock); worker->stopped = TRUE; if (g_thread_self () != worker->thread) - g_mutex_unlock (worker->lock); + g_mutex_unlock (worker->read_lock); g_cancellable_cancel (worker->cancellable); _g_dbus_worker_unref (worker); diff --git a/gdbus/gdbusprivate.h b/gdbus/gdbusprivate.h index 3d250c6..e31f216 100644 --- a/gdbus/gdbusprivate.h +++ b/gdbus/gdbusprivate.h @@ -49,13 +49,16 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker, * and must not block. */ GDBusWorker *_g_dbus_worker_new (GIOStream *stream, + GDBusCapabilityFlags capabilities, GDBusWorkerMessageReceivedCallback message_received_callback, GDBusWorkerDisconnectedCallback disconnected_callback, gpointer user_data); -/* can be called from any thread */ +/* can be called from any thread - steals blob */ void _g_dbus_worker_send_message (GDBusWorker *worker, - GDBusMessage *message); + GDBusMessage *message, + gchar *blob, + gsize blob_len); /* can be called from any thread */ void _g_dbus_worker_stop (GDBusWorker *worker); |