/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* Copyright (C) 2009-2012 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see . */ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #define SPICE_LOG_DOMAIN "SpiceDispatcher" #include "common/mem.h" #include "common/spice_common.h" #include "dispatcher.h" //#define DEBUG_DISPATCHER #ifdef DEBUG_DISPATCHER #include #endif #define ACK 0xffffffff /* * read_safe * helper. reads until size bytes accumulated in buf, if an error other then * EINTR is encountered returns -1, otherwise returns 0. * @block if 1 the read will block (the fd is always blocking). * if 0 poll first, return immediately if no bytes available, otherwise * read size in blocking mode. */ static int read_safe(int fd, uint8_t *buf, size_t size, int block) { int read_size = 0; int ret; struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0}; if (size == 0) { return 0; } if (!block) { while ((ret = poll(&pollfd, 1, 0)) == -1) { if (errno == EINTR) { spice_debug("EINTR in poll"); continue; } spice_error("poll failed"); return -1; } if (!(pollfd.revents & POLLIN)) { return 0; } } while (read_size < size) { ret = read(fd, buf + read_size, size - read_size); if (ret == -1) { if (errno == EINTR) { spice_debug("EINTR in read"); continue; } return -1; } if (ret == 0) { spice_error("broken pipe on read"); return -1; } read_size += ret; } return read_size; } /* * write_safe * @return -1 for error, otherwise number of written bytes. may be zero. */ static int write_safe(int fd, uint8_t *buf, size_t size) { int written_size = 0; int ret; while (written_size < size) { ret = write(fd, buf + written_size, size - written_size); if (ret == -1) { if (errno != EINTR) { spice_debug("EINTR in write"); return -1; } continue; } written_size += ret; } return written_size; } static int dispatcher_handle_single_read(Dispatcher *dispatcher) { int ret; uint32_t type; DispatcherMessage *msg = NULL; uint8_t *payload = dispatcher->payload; uint32_t ack = ACK; if ((ret = read_safe(dispatcher->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) { spice_printerr("error reading from dispatcher: %d", errno); return 0; } if (ret == 0) { /* no messsage */ return 0; } msg = &dispatcher->messages[type]; if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) { spice_printerr("error reading from dispatcher: %d", errno); /* TODO: close socketpair? */ return 0; } if (dispatcher->extra_handler) { dispatcher->extra_handler(dispatcher->opaque, type, (void *)payload); } if (msg->handler) { msg->handler(dispatcher->opaque, type, (void *)payload); } else { spice_printerr("error: no handler for message type %d", type); } if (msg->ack == DISPATCHER_ACK) { if (write_safe(dispatcher->recv_fd, (uint8_t*)&ack, sizeof(ack)) == -1) { spice_printerr("error writing ack for message %d", type); /* TODO: close socketpair? */ } } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) { dispatcher->handle_async_done(dispatcher->opaque, type, (void *)payload); } return 1; } /* * dispatcher_handle_recv_read * doesn't handle being in the middle of a message. all reads are blocking. */ void dispatcher_handle_recv_read(Dispatcher *dispatcher) { while (dispatcher_handle_single_read(dispatcher)) { } } void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, void *payload) { DispatcherMessage *msg; uint32_t ack; int send_fd = dispatcher->send_fd; assert(dispatcher->max_message_type > message_type); assert(dispatcher->messages[message_type].handler); msg = &dispatcher->messages[message_type]; pthread_mutex_lock(&dispatcher->lock); if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) { spice_printerr("error: failed to send message type for message %d", message_type); goto unlock; } if (write_safe(send_fd, payload, msg->size) == -1) { spice_printerr("error: failed to send message body for message %d", message_type); goto unlock; } if (msg->ack == DISPATCHER_ACK) { if (read_safe(send_fd, (uint8_t*)&ack, sizeof(ack), 1) == -1) { spice_printerr("error: failed to read ack"); } else if (ack != ACK) { spice_printerr("error: got wrong ack value in dispatcher " "for message %d\n", message_type); /* TODO handling error? */ } } unlock: pthread_mutex_unlock(&dispatcher->lock); } uint32_t dispatcher_read_message(Dispatcher *dispatcher) { uint32_t message = 0; spice_return_val_if_fail(dispatcher, 0); spice_return_val_if_fail(dispatcher->send_fd != -1, 0); if (read_safe(dispatcher->send_fd, (uint8_t*)&message, sizeof(message), 1) == -1) spice_warn_if_reached(); return message; } void dispatcher_register_async_done_callback( Dispatcher *dispatcher, dispatcher_handle_async_done handler) { assert(dispatcher->handle_async_done == NULL); dispatcher->handle_async_done = handler; } void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, dispatcher_handle_message handler, size_t size, int ack) { DispatcherMessage *msg; assert(message_type < dispatcher->max_message_type); assert(dispatcher->messages[message_type].handler == 0); msg = &dispatcher->messages[message_type]; msg->handler = handler; msg->size = size; msg->ack = ack; if (msg->size > dispatcher->payload_size) { dispatcher->payload = realloc(dispatcher->payload, msg->size); dispatcher->payload_size = msg->size; } } void dispatcher_register_extra_handler( Dispatcher *dispatcher, dispatcher_handle_message extra_handler) { dispatcher->extra_handler = extra_handler; } #ifdef DEBUG_DISPATCHER static void dummy_handler(int bla) { } static void setup_dummy_signal_handler(void) { static int inited = 0; struct sigaction act = { .sa_handler = &dummy_handler, }; if (inited) { return; } inited = 1; /* handle SIGRTMIN+10 in order to test the loops for EINTR */ if (sigaction(SIGRTMIN + 10, &act, NULL) == -1) { fprintf(stderr, "failed to set dummy sigaction for DEBUG_DISPATCHER\n"); exit(1); } } #endif void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, void *opaque) { int channels[2]; #ifdef DEBUG_DISPATCHER setup_dummy_signal_handler(); #endif dispatcher->opaque = opaque; if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { spice_error("socketpair failed %s", strerror(errno)); return; } pthread_mutex_init(&dispatcher->lock, NULL); dispatcher->recv_fd = channels[0]; dispatcher->send_fd = channels[1]; dispatcher->self = pthread_self(); dispatcher->messages = spice_malloc0_n(max_message_type, sizeof(dispatcher->messages[0])); dispatcher->max_message_type = max_message_type; } void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque) { dispatcher->opaque = opaque; } int dispatcher_get_recv_fd(Dispatcher *dispatcher) { return dispatcher->recv_fd; } static gboolean dispatch_cb(GIOChannel *source, GIOCondition condition, gpointer data) { Dispatcher *dispatcher = data; spice_debug(NULL); dispatcher_handle_recv_read(dispatcher); /* FIXME: remove source cb if error */ return TRUE; } void dispatcher_attach(Dispatcher *dispatcher, GMainContext *main_context) { spice_return_if_fail(dispatcher != NULL); spice_return_if_fail(main_context != NULL); GIOChannel *channel = g_io_channel_unix_new(dispatcher->recv_fd); GSource *source = g_io_create_watch(channel, G_IO_IN); g_source_set_callback(source, (GSourceFunc)dispatch_cb, dispatcher, NULL); g_source_attach(source, main_context); g_source_unref(source); }