summaryrefslogtreecommitdiff
path: root/server/dispatcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/dispatcher.c')
-rw-r--r--server/dispatcher.c234
1 files changed, 183 insertions, 51 deletions
diff --git a/server/dispatcher.c b/server/dispatcher.c
index d6c03ca7..7e71ea2d 100644
--- a/server/dispatcher.c
+++ b/server/dispatcher.c
@@ -1,6 +1,5 @@
-/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
- Copyright (C) 2009-2012 Red Hat, Inc.
+ Copyright (C) 2009-2016 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
@@ -39,6 +38,156 @@
#include <signal.h>
#endif
+G_DEFINE_TYPE(Dispatcher, dispatcher, G_TYPE_OBJECT)
+
+#define DISPATCHER_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TYPE_DISPATCHER, DispatcherPrivate))
+
+struct DispatcherPrivate {
+ int recv_fd;
+ int send_fd;
+ pthread_t self;
+ pthread_mutex_t lock;
+ DispatcherMessage *messages;
+ int stage; /* message parser stage - sender has no stages */
+ guint max_message_type;
+ void *payload; /* allocated as max of message sizes */
+ size_t payload_size; /* used to track realloc calls */
+ void *opaque;
+ dispatcher_handle_async_done handle_async_done;
+ dispatcher_handle_any_message any_handler;
+};
+
+enum {
+ PROP_0,
+ PROP_MAX_MESSAGE_TYPE,
+ PROP_OPAQUE
+};
+
+static void
+dispatcher_get_property(GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ Dispatcher *self = DISPATCHER(object);
+
+ switch (property_id)
+ {
+ case PROP_MAX_MESSAGE_TYPE:
+ g_value_set_uint(value, self->priv->max_message_type);
+ break;
+ case PROP_OPAQUE:
+ g_value_set_pointer(value, self->priv->opaque);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
+ }
+}
+
+static void
+dispatcher_set_property(GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ Dispatcher *self = DISPATCHER(object);
+
+ switch (property_id)
+ {
+ case PROP_MAX_MESSAGE_TYPE:
+ self->priv->max_message_type = g_value_get_uint(value);
+ break;
+ case PROP_OPAQUE:
+ dispatcher_set_opaque(self, g_value_get_pointer(value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
+ }
+}
+
+static void
+dispatcher_finalize(GObject *object)
+{
+ Dispatcher *self = DISPATCHER(object);
+ g_free(self->priv->messages);
+ close(self->priv->send_fd);
+ close(self->priv->recv_fd);
+ pthread_mutex_destroy(&self->priv->lock);
+ free(self->priv->payload);
+ G_OBJECT_CLASS(dispatcher_parent_class)->finalize(object);
+}
+
+static void dispatcher_constructed(GObject *object)
+{
+ Dispatcher *self = DISPATCHER(object);
+ int channels[2];
+
+ G_OBJECT_CLASS(dispatcher_parent_class)->constructed(object);
+
+#ifdef DEBUG_DISPATCHER
+ setup_dummy_signal_handler();
+#endif
+ if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+ spice_error("socketpair failed %s", strerror(errno));
+ return;
+ }
+ pthread_mutex_init(&self->priv->lock, NULL);
+ self->priv->recv_fd = channels[0];
+ self->priv->send_fd = channels[1];
+ self->priv->self = pthread_self();
+
+ self->priv->messages = g_new0(DispatcherMessage,
+ self->priv->max_message_type);
+}
+
+static void
+dispatcher_class_init(DispatcherClass *klass)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS(klass);
+
+ g_type_class_add_private(klass, sizeof (DispatcherPrivate));
+
+ object_class->get_property = dispatcher_get_property;
+ object_class->set_property = dispatcher_set_property;
+ object_class->constructed = dispatcher_constructed;
+ object_class->finalize = dispatcher_finalize;
+
+ g_object_class_install_property(object_class,
+ PROP_MAX_MESSAGE_TYPE,
+ g_param_spec_uint("max-message-type",
+ "max-message-type",
+ "Maximum message type",
+ 0, G_MAXUINT, 0,
+ G_PARAM_STATIC_STRINGS |
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property(object_class,
+ PROP_OPAQUE,
+ g_param_spec_pointer("opaque",
+ "opaque",
+ "User data to pass to callbacks",
+ G_PARAM_STATIC_STRINGS |
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT));
+
+}
+
+static void
+dispatcher_init(Dispatcher *self)
+{
+ self->priv = DISPATCHER_PRIVATE(self);
+}
+
+Dispatcher *
+dispatcher_new(size_t max_message_type, void *opaque)
+{
+ return g_object_new(TYPE_DISPATCHER,
+ "max-message-type", (guint) max_message_type,
+ "opaque", opaque,
+ NULL);
+}
+
+
#define ACK 0xffffffff
/*
@@ -118,10 +267,10 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
int ret;
uint32_t type;
DispatcherMessage *msg = NULL;
- uint8_t *payload = dispatcher->payload;
+ uint8_t *payload = dispatcher->priv->payload;
uint32_t ack = ACK;
- if ((ret = read_safe(dispatcher->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
+ if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) {
spice_printerr("error reading from dispatcher: %d", errno);
return 0;
}
@@ -129,28 +278,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
/* no messsage */
return 0;
}
- msg = &dispatcher->messages[type];
- if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
+ msg = &dispatcher->priv->messages[type];
+ if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) {
spice_printerr("error reading from dispatcher: %d", errno);
/* TODO: close socketpair? */
return 0;
}
- if (dispatcher->any_handler) {
- dispatcher->any_handler(dispatcher->opaque, type, payload);
+ if (dispatcher->priv->any_handler) {
+ dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload);
}
if (msg->handler) {
- msg->handler(dispatcher->opaque, (void *)payload);
+ msg->handler(dispatcher->priv->opaque, payload);
} else {
spice_printerr("error: no handler for message type %d", type);
}
if (msg->ack == DISPATCHER_ACK) {
- if (write_safe(dispatcher->recv_fd,
+ if (write_safe(dispatcher->priv->recv_fd,
(uint8_t*)&ack, sizeof(ack)) == -1) {
spice_printerr("error writing ack for message %d", type);
/* TODO: close socketpair? */
}
- } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) {
- dispatcher->handle_async_done(dispatcher->opaque, type,
+ } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->priv->handle_async_done) {
+ dispatcher->priv->handle_async_done(dispatcher->priv->opaque, type,
(void *)payload);
}
return 1;
@@ -171,12 +320,12 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
{
DispatcherMessage *msg;
uint32_t ack;
- int send_fd = dispatcher->send_fd;
+ int send_fd = dispatcher->priv->send_fd;
- assert(dispatcher->max_message_type > message_type);
- assert(dispatcher->messages[message_type].handler);
- msg = &dispatcher->messages[message_type];
- pthread_mutex_lock(&dispatcher->lock);
+ assert(dispatcher->priv->max_message_type > message_type);
+ assert(dispatcher->priv->messages[message_type].handler);
+ msg = &dispatcher->priv->messages[message_type];
+ pthread_mutex_lock(&dispatcher->priv->lock);
if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) {
spice_printerr("error: failed to send message type for message %d",
message_type);
@@ -197,15 +346,15 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
}
}
unlock:
- pthread_mutex_unlock(&dispatcher->lock);
+ pthread_mutex_unlock(&dispatcher->priv->lock);
}
void dispatcher_register_async_done_callback(
Dispatcher *dispatcher,
dispatcher_handle_async_done handler)
{
- assert(dispatcher->handle_async_done == NULL);
- dispatcher->handle_async_done = handler;
+ assert(dispatcher->priv->handle_async_done == NULL);
+ dispatcher->priv->handle_async_done = handler;
}
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
@@ -214,15 +363,15 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
{
DispatcherMessage *msg;
- assert(message_type < dispatcher->max_message_type);
- assert(dispatcher->messages[message_type].handler == 0);
- msg = &dispatcher->messages[message_type];
+ assert(message_type < dispatcher->priv->max_message_type);
+ assert(dispatcher->priv->messages[message_type].handler == 0);
+ msg = &dispatcher->priv->messages[message_type];
msg->handler = handler;
msg->size = size;
msg->ack = ack;
- if (msg->size > dispatcher->payload_size) {
- dispatcher->payload = realloc(dispatcher->payload, msg->size);
- dispatcher->payload_size = msg->size;
+ if (msg->size > dispatcher->priv->payload_size) {
+ dispatcher->priv->payload = realloc(dispatcher->priv->payload, msg->size);
+ dispatcher->priv->payload_size = msg->size;
}
}
@@ -230,7 +379,7 @@ void dispatcher_register_universal_handler(
Dispatcher *dispatcher,
dispatcher_handle_any_message any_handler)
{
- dispatcher->any_handler = any_handler;
+ dispatcher->priv->any_handler = any_handler;
}
#ifdef DEBUG_DISPATCHER
@@ -257,35 +406,18 @@ static void setup_dummy_signal_handler(void)
}
#endif
-void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
- void *opaque)
+void dispatcher_set_opaque(Dispatcher *self, void *opaque)
{
- int channels[2];
-
-#ifdef DEBUG_DISPATCHER
- setup_dummy_signal_handler();
-#endif
- dispatcher->opaque = opaque;
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
- spice_error("socketpair failed %s", strerror(errno));
- return;
- }
- pthread_mutex_init(&dispatcher->lock, NULL);
- dispatcher->recv_fd = channels[0];
- dispatcher->send_fd = channels[1];
- dispatcher->self = pthread_self();
-
- dispatcher->messages = spice_malloc0_n(max_message_type,
- sizeof(dispatcher->messages[0]));
- dispatcher->max_message_type = max_message_type;
+ self->priv->opaque = opaque;
+ g_object_notify(G_OBJECT(self), "opaque");
}
-void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
+int dispatcher_get_recv_fd(Dispatcher *dispatcher)
{
- dispatcher->opaque = opaque;
+ return dispatcher->priv->recv_fd;
}
-int dispatcher_get_recv_fd(Dispatcher *dispatcher)
+pthread_t dispatcher_get_thread_id(Dispatcher *self)
{
- return dispatcher->recv_fd;
+ return self->priv->self;
}