summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Zeuthen <davidz@redhat.com>2010-04-27 15:50:31 -0400
committerDavid Zeuthen <davidz@redhat.com>2010-04-27 15:50:31 -0400
commitdf990c08190b5b7d575d8b5fcaf0875cafd2061b (patch)
tree85ca6c84087e5f3776a8c2ca71a8173f8060ebd5
parentde82c0c9aa4e5150d065c1bc0b90e1413b537c1b (diff)
Move writing to our dedicated worker thread
-rw-r--r--gdbus/gdbusconnection.c135
-rw-r--r--gdbus/gdbusprivate.c290
-rw-r--r--gdbus/gdbusprivate.h7
3 files changed, 264 insertions, 168 deletions
diff --git a/gdbus/gdbusconnection.c b/gdbus/gdbusconnection.c
index 3fca1f1..8a32dc9 100644
--- a/gdbus/gdbusconnection.c
+++ b/gdbus/gdbusconnection.c
@@ -815,123 +815,6 @@ g_dbus_connection_close (GDBusConnection *connection)
/* ---------------------------------------------------------------------------------------------------- */
-/* caller must hold lock */
-static gboolean
-g_dbus_connection_write (GDBusConnection *connection,
- const gchar *blob,
- gssize blob_size,
- GUnixFDList *fd_list,
- GError **error)
-{
- gboolean ret;
-
- ret = FALSE;
-
- if (connection->priv->closed)
- {
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_CLOSED,
- _("The connection is closed"));
- goto out;
- }
-
- /* TODO: do this in private thread */
-
- if (blob_size == -1)
- blob_size = strlen (blob);
-
- g_assert (blob_size > 16);
-
- /* First, the initial 16 bytes - special case UNIX sockets here
- * since it may involve writing an ancillary message
- */
-#ifdef G_OS_UNIX
- {
- GOutputVector vector;
- GSocketControlMessage *message;
- GSocket *socket;
- gssize bytes_written;
-
- message = NULL;
- if (fd_list != NULL)
- {
- if (!G_IS_UNIX_CONNECTION (connection->priv->stream))
- {
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor on unsupported stream of type %s",
- g_type_name (G_TYPE_FROM_INSTANCE (connection->priv->stream)));
- goto out;
- }
- else if (!(connection->priv->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
- {
- g_set_error_literal (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor but remote peer does not support this capability");
- goto out;
- }
- message = g_unix_fd_message_new_with_fd_list (fd_list);
- }
-
- socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (connection->priv->stream));
- vector.buffer = blob;
- vector.size = 16;
-
- bytes_written = g_socket_send_message (socket,
- NULL, /* address */
- &vector,
- 1,
- message != NULL ? &message : NULL,
- message != NULL ? 1 : 0,
- G_SOCKET_MSG_NONE,
- NULL, /* cancellable */
- error);
- if (bytes_written == -1)
- {
- g_prefix_error (error, _("Error writing file descriptors to socket: "));
- g_object_unref (message);
- goto out;
- }
- if (bytes_written < 16)
- {
- /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
- * messages are sent?
- */
- g_assert_not_reached ();
- }
- if (message != NULL)
- g_object_unref (message);
- }
-#else
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (connection->priv->stream),
- (const gchar *) blob,
- 16,
- NULL, /* bytes_written */
- NULL, /* cancellable */
- error))
- goto out;
-#endif
-
- /* Then write the rest of the message */
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (connection->priv->stream),
- (const gchar *) blob + 16,
- blob_size - 16,
- NULL, /* bytes_written */
- NULL, /* cancellable */
- error))
- goto out;
-
- ret = TRUE;
-
- out:
- return ret;
-}
-
-/* ---------------------------------------------------------------------------------------------------- */
-
static gboolean
g_dbus_connection_send_message_unlocked (GDBusConnection *connection,
GDBusMessage *message,
@@ -986,21 +869,16 @@ g_dbus_connection_send_message_unlocked (GDBusConnection *connection,
g_printerr ("----\n");
#endif
- /* TODO: use connection->priv->auth to encode the message */
+ /* TODO: use connection->priv->auth to encode the blob */
if (out_serial != NULL)
*out_serial = serial_to_use;
- if (!g_dbus_connection_write (connection,
- (const gchar *) blob,
- blob_size,
-#ifdef G_OS_UNIX
- g_dbus_message_get_unix_fd_list (message),
-#else
- NULL,
-#endif
- error))
- goto out;
+ _g_dbus_worker_send_message (connection->priv->worker,
+ message,
+ (gchar*) blob,
+ blob_size);
+ blob = NULL; /* since _g_dbus_worker_send_message() steals the blob */
ret = TRUE;
@@ -1636,6 +1514,7 @@ initable_init (GInitable *initable,
}
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
+ connection->priv->capabilities,
on_worker_message_received,
on_worker_closed,
connection);
diff --git a/gdbus/gdbusprivate.c b/gdbus/gdbusprivate.c
index 0723a9f..fad7b4d 100644
--- a/gdbus/gdbusprivate.c
+++ b/gdbus/gdbusprivate.c
@@ -301,8 +301,8 @@ struct GDBusWorker
{
volatile gint ref_count;
gboolean stopped;
- GMutex *lock;
GIOStream *stream;
+ GDBusCapabilityFlags capabilities;
GCancellable *cancellable;
GDBusWorkerMessageReceivedCallback message_received_callback;
GDBusWorkerDisconnectedCallback disconnected_callback;
@@ -310,20 +310,30 @@ struct GDBusWorker
GThread *thread;
- /* used for reading */
- gchar *read_buffer;
- gsize read_buffer_allocated_size;
- gsize read_buffer_cur_size;
- gsize read_buffer_bytes_wanted;
- GUnixFDList *fd_list;
-
- GSocketControlMessage **read_ancillary_messages;
- gint read_num_ancillary_messages;
-
/* if not NULL, stream is GSocketConnection */
GSocket *socket;
+
+ /* used for reading */
+ GMutex *read_lock;
+ gchar *read_buffer;
+ gsize read_buffer_allocated_size;
+ gsize read_buffer_cur_size;
+ gsize read_buffer_bytes_wanted;
+ GUnixFDList *read_fd_list;
+ GSocketControlMessage **read_ancillary_messages;
+ gint read_num_ancillary_messages;
+
+ /* used for writing */
+ GMutex *write_lock;
+ GQueue *write_queue;
+ gboolean write_is_pending;
};
+struct _MessageToWriteData ;
+typedef struct _MessageToWriteData MessageToWriteData;
+
+static void message_to_write_data_free (MessageToWriteData *data);
+
static GDBusWorker *
_g_dbus_worker_ref (GDBusWorker *worker)
{
@@ -339,10 +349,17 @@ _g_dbus_worker_unref (GDBusWorker *worker)
_g_dbus_shared_thread_unref ();
g_object_unref (worker->stream);
- g_mutex_free (worker->lock);
+
+ g_mutex_free (worker->read_lock);
g_object_unref (worker->cancellable);
- if (worker->fd_list != NULL)
- g_object_unref (worker->fd_list);
+ if (worker->read_fd_list != NULL)
+ g_object_unref (worker->read_fd_list);
+
+ g_mutex_free (worker->write_lock);
+ g_queue_foreach (worker->write_queue,
+ (GFunc) message_to_write_data_free,
+ NULL);
+ g_queue_free (worker->write_queue);
g_free (worker);
}
}
@@ -366,7 +383,7 @@ _g_dbus_worker_emit_message (GDBusWorker *worker,
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
-/* called in private thread shared by all GDBusConnection instances (without lock held) */
+/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
static void
_g_dbus_worker_do_read_cb (GInputStream *input_stream,
GAsyncResult *res,
@@ -376,7 +393,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
GError *error;
gssize bytes_read;
- g_mutex_lock (worker->lock);
+ g_mutex_lock (worker->read_lock);
/* If already stopped, don't even process the reply */
if (worker->stopped)
@@ -411,9 +428,9 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
fd_message = G_UNIX_FD_MESSAGE (control_message);
fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
- if (worker->fd_list == NULL)
+ if (worker->read_fd_list == NULL)
{
- worker->fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
+ worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
}
else
{
@@ -421,7 +438,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
for (n = 0; n < num_fds; n++)
{
/* TODO: really want a append_steal() */
- g_unix_fd_list_append (worker->fd_list, fds[n], NULL);
+ g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
close (fds[n]);
}
}
@@ -522,10 +539,10 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
goto out;
}
- if (worker->fd_list != NULL)
+ if (worker->read_fd_list != NULL)
{
- g_dbus_message_set_unix_fd_list (message, worker->fd_list);
- worker->fd_list = NULL;
+ g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
+ worker->read_fd_list = NULL;
}
/* yay, got a message, go deliver it */
@@ -545,13 +562,13 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
}
out:
- g_mutex_unlock (worker->lock);
+ g_mutex_unlock (worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
_g_dbus_worker_unref (worker);
}
-/* called in private thread shared by all GDBusConnection instances (with lock held) */
+/* called in private thread shared by all GDBusConnection instances (with read-lock held) */
static void
_g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
{
@@ -594,15 +611,214 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
}
}
-/* called in private thread shared by all GDBusConnection instances (without lock held) */
+/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
static void
_g_dbus_worker_do_read (GDBusWorker *worker)
{
- g_mutex_lock (worker->lock);
+ g_mutex_lock (worker->read_lock);
_g_dbus_worker_do_read_unlocked (worker);
- g_mutex_unlock (worker->lock);
+ g_mutex_unlock (worker->read_lock);
}
+/* ---------------------------------------------------------------------------------------------------- */
+
+struct _MessageToWriteData
+{
+ GDBusMessage *message;
+ gchar *blob;
+ gsize blob_size;
+};
+
+static void
+message_to_write_data_free (MessageToWriteData *data)
+{
+ g_object_unref (data->message);
+ g_free (data->blob);
+ g_free (data);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
+static gboolean
+write_message (GDBusWorker *worker,
+ MessageToWriteData *data,
+ GError **error)
+{
+ gboolean ret;
+
+ g_assert (data->blob_size > 16);
+
+ ret = FALSE;
+
+ /* First, the initial 16 bytes - special case UNIX sockets here
+ * since it may involve writing an ancillary message with file
+ * descriptors
+ */
+#ifdef G_OS_UNIX
+ {
+ GOutputVector vector;
+ GSocketControlMessage *message;
+ GUnixFDList *fd_list;
+ gssize bytes_written;
+
+ fd_list = g_dbus_message_get_unix_fd_list (data->message);
+
+ message = NULL;
+ if (fd_list != NULL)
+ {
+ if (!G_IS_UNIX_CONNECTION (worker->stream))
+ {
+ g_set_error (error,
+ G_IO_ERROR,
+ G_IO_ERROR_INVALID_ARGUMENT,
+ "Tried sending a file descriptor on unsupported stream of type %s",
+ g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
+ goto out;
+ }
+ else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
+ {
+ g_set_error_literal (error,
+ G_IO_ERROR,
+ G_IO_ERROR_INVALID_ARGUMENT,
+ "Tried sending a file descriptor but remote peer does not support this capability");
+ goto out;
+ }
+ message = g_unix_fd_message_new_with_fd_list (fd_list);
+ }
+
+ vector.buffer = data->blob;
+ vector.size = 16;
+
+ bytes_written = g_socket_send_message (worker->socket,
+ NULL, /* address */
+ &vector,
+ 1,
+ message != NULL ? &message : NULL,
+ message != NULL ? 1 : 0,
+ G_SOCKET_MSG_NONE,
+ worker->cancellable,
+ error);
+ if (bytes_written == -1)
+ {
+ g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
+ if (message != NULL)
+ g_object_unref (message);
+ goto out;
+ }
+ if (message != NULL)
+ g_object_unref (message);
+
+ if (bytes_written < 16)
+ {
+ /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
+ * messages are sent?
+ */
+ g_assert_not_reached ();
+ }
+ }
+#else
+ /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
+ if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
+ (const gchar *) data->blob,
+ 16,
+ NULL, /* bytes_written */
+ worker->cancellable, /* cancellable */
+ error))
+ goto out;
+#endif
+
+ /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
+ if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
+ (const gchar *) data->blob + 16,
+ data->blob_size - 16,
+ NULL, /* bytes_written */
+ worker->cancellable, /* cancellable */
+ error))
+ goto out;
+
+ ret = TRUE;
+
+ out:
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ gboolean more_writes_are_pending;
+ MessageToWriteData *data;
+ GError *error;
+
+ g_mutex_lock (worker->write_lock);
+
+ data = g_queue_pop_head (worker->write_queue);
+ g_assert (data != NULL);
+
+ error = NULL;
+ if (!write_message (worker,
+ data,
+ &error))
+ {
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+ g_error_free (error);
+ }
+ message_to_write_data_free (data);
+
+ more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
+
+ worker->write_is_pending = more_writes_are_pending;
+ g_mutex_unlock (worker->write_lock);
+
+ return more_writes_are_pending;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread - steals blob */
+void
+_g_dbus_worker_send_message (GDBusWorker *worker,
+ GDBusMessage *message,
+ gchar *blob,
+ gsize blob_len)
+{
+ MessageToWriteData *data;
+
+ g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+ g_return_if_fail (blob != NULL);
+ g_return_if_fail (blob_len > 16);
+
+ data = g_new0 (MessageToWriteData, 1);
+ data->message = g_object_ref (message);
+ data->blob = blob; /* steal! */
+ data->blob_size = blob_len;
+
+ g_mutex_lock (worker->write_lock);
+ g_queue_push_tail (worker->write_queue, data);
+ if (!worker->write_is_pending)
+ {
+ GSource *idle_source;
+
+ worker->write_is_pending = TRUE;
+
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ write_message_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, shared_thread_data->context);
+ g_source_unref (idle_source);
+ }
+ g_mutex_unlock (worker->write_lock);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
static void
_g_dbus_worker_thread_begin_func (gpointer user_data)
@@ -617,6 +833,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
GDBusWorker *
_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
gpointer user_data)
@@ -629,13 +846,18 @@ _g_dbus_worker_new (GIOStream *stream,
worker = g_new0 (GDBusWorker, 1);
worker->ref_count = 1;
- worker->lock = g_mutex_new ();
+
+ worker->read_lock = g_mutex_new ();
worker->message_received_callback = message_received_callback;
worker->disconnected_callback = disconnected_callback;
worker->user_data = user_data;
worker->stream = g_object_ref (stream);
+ worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
+ worker->write_lock = g_mutex_new ();
+ worker->write_queue = g_queue_new ();
+
if (G_IS_SOCKET_CONNECTION (worker->stream))
worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
@@ -644,14 +866,6 @@ _g_dbus_worker_new (GIOStream *stream,
return worker;
}
-void
-_g_dbus_worker_send_message (GDBusWorker *worker,
- GDBusMessage *message)
-{
- g_assert_not_reached ();
-}
-
-
/* This can be called from any thread - frees worker - guarantees no callbacks
* will ever be issued again
*/
@@ -663,10 +877,10 @@ _g_dbus_worker_stop (GDBusWorker *worker)
* we're already holding the lock...
*/
if (g_thread_self () != worker->thread)
- g_mutex_lock (worker->lock);
+ g_mutex_lock (worker->read_lock);
worker->stopped = TRUE;
if (g_thread_self () != worker->thread)
- g_mutex_unlock (worker->lock);
+ g_mutex_unlock (worker->read_lock);
g_cancellable_cancel (worker->cancellable);
_g_dbus_worker_unref (worker);
diff --git a/gdbus/gdbusprivate.h b/gdbus/gdbusprivate.h
index 3d250c6..e31f216 100644
--- a/gdbus/gdbusprivate.h
+++ b/gdbus/gdbusprivate.h
@@ -49,13 +49,16 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
* and must not block.
*/
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
gpointer user_data);
-/* can be called from any thread */
+/* can be called from any thread - steals blob */
void _g_dbus_worker_send_message (GDBusWorker *worker,
- GDBusMessage *message);
+ GDBusMessage *message,
+ gchar *blob,
+ gsize blob_len);
/* can be called from any thread */
void _g_dbus_worker_stop (GDBusWorker *worker);