diff options
Diffstat (limited to 'server/dispatcher.c')
-rw-r--r-- | server/dispatcher.c | 234 |
1 files changed, 183 insertions, 51 deletions
diff --git a/server/dispatcher.c b/server/dispatcher.c index d6c03ca7..7e71ea2d 100644 --- a/server/dispatcher.c +++ b/server/dispatcher.c @@ -1,6 +1,5 @@ -/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* - Copyright (C) 2009-2012 Red Hat, Inc. + Copyright (C) 2009-2016 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -39,6 +38,156 @@ #include <signal.h> #endif +G_DEFINE_TYPE(Dispatcher, dispatcher, G_TYPE_OBJECT) + +#define DISPATCHER_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TYPE_DISPATCHER, DispatcherPrivate)) + +struct DispatcherPrivate { + int recv_fd; + int send_fd; + pthread_t self; + pthread_mutex_t lock; + DispatcherMessage *messages; + int stage; /* message parser stage - sender has no stages */ + guint max_message_type; + void *payload; /* allocated as max of message sizes */ + size_t payload_size; /* used to track realloc calls */ + void *opaque; + dispatcher_handle_async_done handle_async_done; + dispatcher_handle_any_message any_handler; +}; + +enum { + PROP_0, + PROP_MAX_MESSAGE_TYPE, + PROP_OPAQUE +}; + +static void +dispatcher_get_property(GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + Dispatcher *self = DISPATCHER(object); + + switch (property_id) + { + case PROP_MAX_MESSAGE_TYPE: + g_value_set_uint(value, self->priv->max_message_type); + break; + case PROP_OPAQUE: + g_value_set_pointer(value, self->priv->opaque); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +dispatcher_set_property(GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + Dispatcher *self = DISPATCHER(object); + + switch (property_id) + { + case PROP_MAX_MESSAGE_TYPE: + self->priv->max_message_type = g_value_get_uint(value); + break; + case PROP_OPAQUE: + dispatcher_set_opaque(self, g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +dispatcher_finalize(GObject *object) +{ + Dispatcher *self = DISPATCHER(object); + g_free(self->priv->messages); + close(self->priv->send_fd); + close(self->priv->recv_fd); + pthread_mutex_destroy(&self->priv->lock); + free(self->priv->payload); + G_OBJECT_CLASS(dispatcher_parent_class)->finalize(object); +} + +static void dispatcher_constructed(GObject *object) +{ + Dispatcher *self = DISPATCHER(object); + int channels[2]; + + G_OBJECT_CLASS(dispatcher_parent_class)->constructed(object); + +#ifdef DEBUG_DISPATCHER + setup_dummy_signal_handler(); +#endif + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { + spice_error("socketpair failed %s", strerror(errno)); + return; + } + pthread_mutex_init(&self->priv->lock, NULL); + self->priv->recv_fd = channels[0]; + self->priv->send_fd = channels[1]; + self->priv->self = pthread_self(); + + self->priv->messages = g_new0(DispatcherMessage, + self->priv->max_message_type); +} + +static void +dispatcher_class_init(DispatcherClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS(klass); + + g_type_class_add_private(klass, sizeof (DispatcherPrivate)); + + object_class->get_property = dispatcher_get_property; + object_class->set_property = dispatcher_set_property; + object_class->constructed = dispatcher_constructed; + object_class->finalize = dispatcher_finalize; + + g_object_class_install_property(object_class, + PROP_MAX_MESSAGE_TYPE, + g_param_spec_uint("max-message-type", + "max-message-type", + "Maximum message type", + 0, G_MAXUINT, 0, + G_PARAM_STATIC_STRINGS | + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(object_class, + PROP_OPAQUE, + g_param_spec_pointer("opaque", + "opaque", + "User data to pass to callbacks", + G_PARAM_STATIC_STRINGS | + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT)); + +} + +static void +dispatcher_init(Dispatcher *self) +{ + self->priv = DISPATCHER_PRIVATE(self); +} + +Dispatcher * +dispatcher_new(size_t max_message_type, void *opaque) +{ + return g_object_new(TYPE_DISPATCHER, + "max-message-type", (guint) max_message_type, + "opaque", opaque, + NULL); +} + + #define ACK 0xffffffff /* @@ -118,10 +267,10 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher) int ret; uint32_t type; DispatcherMessage *msg = NULL; - uint8_t *payload = dispatcher->payload; + uint8_t *payload = dispatcher->priv->payload; uint32_t ack = ACK; - if ((ret = read_safe(dispatcher->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) { + if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) { spice_printerr("error reading from dispatcher: %d", errno); return 0; } @@ -129,28 +278,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher) /* no messsage */ return 0; } - msg = &dispatcher->messages[type]; - if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) { + msg = &dispatcher->priv->messages[type]; + if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) { spice_printerr("error reading from dispatcher: %d", errno); /* TODO: close socketpair? */ return 0; } - if (dispatcher->any_handler) { - dispatcher->any_handler(dispatcher->opaque, type, payload); + if (dispatcher->priv->any_handler) { + dispatcher->priv->any_handler(dispatcher->priv->opaque, type, payload); } if (msg->handler) { - msg->handler(dispatcher->opaque, (void *)payload); + msg->handler(dispatcher->priv->opaque, payload); } else { spice_printerr("error: no handler for message type %d", type); } if (msg->ack == DISPATCHER_ACK) { - if (write_safe(dispatcher->recv_fd, + if (write_safe(dispatcher->priv->recv_fd, (uint8_t*)&ack, sizeof(ack)) == -1) { spice_printerr("error writing ack for message %d", type); /* TODO: close socketpair? */ } - } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) { - dispatcher->handle_async_done(dispatcher->opaque, type, + } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->priv->handle_async_done) { + dispatcher->priv->handle_async_done(dispatcher->priv->opaque, type, (void *)payload); } return 1; @@ -171,12 +320,12 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, { DispatcherMessage *msg; uint32_t ack; - int send_fd = dispatcher->send_fd; + int send_fd = dispatcher->priv->send_fd; - assert(dispatcher->max_message_type > message_type); - assert(dispatcher->messages[message_type].handler); - msg = &dispatcher->messages[message_type]; - pthread_mutex_lock(&dispatcher->lock); + assert(dispatcher->priv->max_message_type > message_type); + assert(dispatcher->priv->messages[message_type].handler); + msg = &dispatcher->priv->messages[message_type]; + pthread_mutex_lock(&dispatcher->priv->lock); if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) { spice_printerr("error: failed to send message type for message %d", message_type); @@ -197,15 +346,15 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, } } unlock: - pthread_mutex_unlock(&dispatcher->lock); + pthread_mutex_unlock(&dispatcher->priv->lock); } void dispatcher_register_async_done_callback( Dispatcher *dispatcher, dispatcher_handle_async_done handler) { - assert(dispatcher->handle_async_done == NULL); - dispatcher->handle_async_done = handler; + assert(dispatcher->priv->handle_async_done == NULL); + dispatcher->priv->handle_async_done = handler; } void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, @@ -214,15 +363,15 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, { DispatcherMessage *msg; - assert(message_type < dispatcher->max_message_type); - assert(dispatcher->messages[message_type].handler == 0); - msg = &dispatcher->messages[message_type]; + assert(message_type < dispatcher->priv->max_message_type); + assert(dispatcher->priv->messages[message_type].handler == 0); + msg = &dispatcher->priv->messages[message_type]; msg->handler = handler; msg->size = size; msg->ack = ack; - if (msg->size > dispatcher->payload_size) { - dispatcher->payload = realloc(dispatcher->payload, msg->size); - dispatcher->payload_size = msg->size; + if (msg->size > dispatcher->priv->payload_size) { + dispatcher->priv->payload = realloc(dispatcher->priv->payload, msg->size); + dispatcher->priv->payload_size = msg->size; } } @@ -230,7 +379,7 @@ void dispatcher_register_universal_handler( Dispatcher *dispatcher, dispatcher_handle_any_message any_handler) { - dispatcher->any_handler = any_handler; + dispatcher->priv->any_handler = any_handler; } #ifdef DEBUG_DISPATCHER @@ -257,35 +406,18 @@ static void setup_dummy_signal_handler(void) } #endif -void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, - void *opaque) +void dispatcher_set_opaque(Dispatcher *self, void *opaque) { - int channels[2]; - -#ifdef DEBUG_DISPATCHER - setup_dummy_signal_handler(); -#endif - dispatcher->opaque = opaque; - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - spice_error("socketpair failed %s", strerror(errno)); - return; - } - pthread_mutex_init(&dispatcher->lock, NULL); - dispatcher->recv_fd = channels[0]; - dispatcher->send_fd = channels[1]; - dispatcher->self = pthread_self(); - - dispatcher->messages = spice_malloc0_n(max_message_type, - sizeof(dispatcher->messages[0])); - dispatcher->max_message_type = max_message_type; + self->priv->opaque = opaque; + g_object_notify(G_OBJECT(self), "opaque"); } -void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque) +int dispatcher_get_recv_fd(Dispatcher *dispatcher) { - dispatcher->opaque = opaque; + return dispatcher->priv->recv_fd; } -int dispatcher_get_recv_fd(Dispatcher *dispatcher) +pthread_t dispatcher_get_thread_id(Dispatcher *self) { - return dispatcher->recv_fd; + return self->priv->self; } |