diff options
author | Yonit Halperin <yhalperi@redhat.com> | 2009-10-16 00:21:43 +0200 |
---|---|---|
committer | Yaniv Kamay <ykamay@redhat.com> | 2009-10-18 17:42:37 +0200 |
commit | ef213c66c19d265140e9a55519b174d34ff1f16b (patch) | |
tree | 0a8e58217f5757881d4d4798d1316dbb3809f37a /server | |
parent | 308e4545cbf8d26d5d47ad6ab9f2c6e6e6648003 (diff) |
tunnel
Diffstat (limited to 'server')
-rw-r--r-- | server/Makefile.am | 6 | ||||
-rw-r--r-- | server/red_channel.c | 520 | ||||
-rw-r--r-- | server/red_channel.h | 182 | ||||
-rw-r--r-- | server/red_tunnel_worker.c | 3510 | ||||
-rwxr-xr-x | server/red_tunnel_worker.h | 29 | ||||
-rw-r--r-- | server/reds.c | 17 | ||||
-rw-r--r-- | server/reds.h | 3 | ||||
-rw-r--r-- | server/vd_interface.h | 18 |
8 files changed, 4284 insertions, 1 deletions
diff --git a/server/Makefile.am b/server/Makefile.am index e6ffab4..f990961 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -9,6 +9,7 @@ INCLUDES = \ $(LOG4CPP_CFLAGS) \ $(SSL_CFLAGS) \ $(CELT051_CFLAGS) \ + $(SLIRP_CFLAGS) \ -DCAIRO_CANVAS_IMAGE_CACHE \ -DRED_STATISTICS \ $(WARN_CFLAGS) \ @@ -40,6 +41,7 @@ libspice_la_LIBADD = \ $(QCAIRO_LIBS) \ $(SSL_LIBS) \ $(CELT051_LIBS) \ + $(SLIRP_LIBS) \ $(LIBRT) \ $(NULL) @@ -64,6 +66,10 @@ libspice_la_SOURCES = \ red_yuv.h \ snd_worker.c \ snd_worker.h \ + red_channel.h \ + red_channel.c \ + red_tunnel_worker.c \ + red_tunnel_worker.h \ spice.h \ vd_interface.h \ $(COMMON_SRCS) \ diff --git a/server/red_channel.c b/server/red_channel.c new file mode 100644 index 0000000..48ace44 --- /dev/null +++ b/server/red_channel.c @@ -0,0 +1,520 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include <stdio.h> +#include <stdint.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include "red_channel.h" + +static void red_channel_receive(void *data); +static void red_channel_push(RedChannel *channel); +static void red_channel_opaque_push(void *data); +static PipeItem *red_channel_pipe_get(RedChannel *channel); +static void red_channel_pipe_clear(RedChannel *channel); + +/* return the number of bytes read. -1 in case of error */ +static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size) +{ + uint8_t *pos = buf; + while (size) { + int now; + if (peer->shutdown) { + return -1; + } + if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) { + if (now == 0) { + return -1; + } + ASSERT(now == -1); + if (errno == EAGAIN) { + break; + } else if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return -1; + } else { + red_printf("%s", strerror(errno)); + return -1; + } + } else { + size -= now; + pos += now; + } + } + return pos - buf; +} + +static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler) +{ + int bytes_read; + + for (;;) { + int ret_handle; + if (handler->header_pos < sizeof(RedDataHeader)) { + bytes_read = red_peer_receive(peer, + ((uint8_t *)&handler->header) + handler->header_pos, + sizeof(RedDataHeader) - handler->header_pos); + if (bytes_read == -1) { + handler->on_error(handler->opaque); + return; + } + handler->header_pos += bytes_read; + + if (handler->header_pos != sizeof(RedDataHeader)) { + return; + } + } + + if (handler->msg_pos < handler->header.size) { + if (!handler->msg) { + handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header); + } + + bytes_read = red_peer_receive(peer, + handler->msg + handler->msg_pos, + handler->header.size - handler->msg_pos); + if (bytes_read == -1) { + handler->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->on_error(handler->opaque); + return; + } + handler->msg_pos += bytes_read; + if (handler->msg_pos != handler->header.size) { + return; + } + } + + ret_handle = handler->handle_message(handler->opaque, &handler->header, + handler->msg); + handler->msg_pos = 0; + handler->msg = NULL; + handler->header_pos = 0; + + if (!ret_handle) { + handler->on_error(handler->opaque); + return; + } + } +} + +static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size) +{ + struct iovec *now = vec; + + while ((skip) && (skip >= now->iov_len)) { + skip -= now->iov_len; + --*vec_size; + now++; + } + + now->iov_base = (uint8_t *)now->iov_base + skip; + now->iov_len -= skip; + return now; +} + +static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler) +{ + int n; + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; + } + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + for (;;) { + if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) { + switch (errno) { + case EAGAIN: + handler->on_block(handler->opaque); + return; + case EINTR: + continue; + case EPIPE: + handler->on_error(handler->opaque); + return; + default: + red_printf("%s", strerror(errno)); + handler->on_error(handler->opaque); + return; + } + } else { + handler->pos += n; + handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size); + if (!handler->vec_size) { + if (handler->pos == handler->size) { // finished writing data + handler->on_msg_done(handler->opaque); + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + return; + } else { + // There wasn't enough place for all the outgoing data in one iovec array. + // Filling the rest of the data. + handler->vec = handler->vec_buf; + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + } + } + } +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size); + + +static void red_channel_peer_on_error(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->disconnect(channel); +} + +static int red_channel_peer_get_out_msg_size(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + return channel->send_data.size; +} + +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size) +{ + RedChannel *channel = (RedChannel *)opaque; + red_channel_fill_iovec(channel, vec, vec_size); +} + +static void red_channel_peer_on_out_block(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.blocked = TRUE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, red_channel_opaque_push, + channel); +} + +static void red_channel_peer_on_out_msg_done(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.size = 0; + channel->send_data.n_bufs = 0; + channel->send_data.not_sent_buf_head = 0; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + channel->send_data.item = NULL; + } + if (channel->send_data.blocked) { + channel->send_data.blocked = FALSE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, + channel); + } +} + +RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core, + int migrate, int handle_acks, + channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, + channel_handle_message_proc handle_message, + channel_alloc_msg_recv_buf_proc alloc_recv_buf, + channel_release_msg_recv_buf_proc release_recv_buf, + channel_send_pipe_item_proc send_item, + channel_release_pipe_item_proc release_item) +{ + RedChannel *channel; + + ASSERT(size >= sizeof(*channel)); + ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf && + release_item); + if (!(channel = malloc(size))) { + red_printf("malloc failed"); + goto error1; + } + memset(channel, 0, size); + + channel->handle_acks = handle_acks; + channel->disconnect = disconnect; + channel->send_item = send_item; + channel->release_item = release_item; + + channel->peer = peer; + channel->core = core; + channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + channel->ack_data.client_generation = ~0; + + channel->migrate = migrate; + ring_init(&channel->pipe); + + channel->incoming.opaque = channel; + channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; + channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; + channel->incoming.handle_message = (handle_message_proc)handle_message; + channel->incoming.on_error = red_channel_peer_on_error; + + channel->outgoing.opaque = channel; + channel->outgoing.pos = 0; + channel->outgoing.size = 0; + + channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing.on_block = red_channel_peer_on_out_block; + channel->outgoing.on_error = red_channel_peer_on_error; + channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + + if (!config_socket(channel)) { + goto error2; + } + + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + + return channel; + +error2: + free(channel); +error1: + peer->cb_free(peer); + + return NULL; +} + +void red_channel_destroy(RedChannel *channel) +{ + if (!channel) { + return; + } + red_channel_pipe_clear(channel); + channel->core->set_file_handlers(channel->core, channel->peer->socket, + NULL, NULL, NULL); + channel->peer->cb_free(channel->peer); + free(channel); +} + +void red_channel_shutdown(RedChannel *channel) +{ + red_printf(""); + if (!channel->peer->shutdown) { + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + red_channel_pipe_clear(channel); + shutdown(channel->peer->socket, SHUT_RDWR); + channel->peer->shutdown = TRUE; + } +} + +void red_channel_init_outgoing_messages_window(RedChannel *channel) +{ + channel->ack_data.messages_window = 0; + red_channel_push(channel); +} + +int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg) +{ + switch (header->type) { + case REDC_ACK_SYNC: + if (header->size != sizeof(uint32_t)) { + red_printf("bad message size"); + return FALSE; + } + channel->ack_data.client_generation = *(uint32_t *)(msg); + break; + case REDC_ACK: + if (channel->ack_data.client_generation == channel->ack_data.generation) { + channel->ack_data.messages_window -= CLIENT_ACK_WINDOW; + red_channel_push(channel); + } + break; + default: + red_printf("invalid message type %u", header->type); + return FALSE; + } + return TRUE; +} + +static void red_channel_receive(void *data) +{ + RedChannel *channel = (RedChannel *)data; + red_peer_handle_incoming(channel->peer, &channel->incoming); +} + +static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + int pos = channel->send_data.n_bufs++; + ASSERT(pos < MAX_SEND_BUFS); + channel->send_data.bufs[pos].size = size; + channel->send_data.bufs[pos].data = data; +} + +void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + __red_channel_add_buf(channel, data, size); + channel->send_data.header.size += size; +} + +void red_channel_reset_send_data(RedChannel *channel) +{ + channel->send_data.n_bufs = 0; + channel->send_data.header.size = 0; + channel->send_data.header.sub_list = 0; + ++channel->send_data.header.serial; + __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(RedDataHeader)); +} + +void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item) +{ + channel->send_data.header.type = msg_type; + channel->send_data.item = item; +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size) +{ + BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head; + ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs); + *vec_size = 0; + do { + vec[*vec_size].iov_base = buf->data; + vec[*vec_size].iov_len = buf->size; + (*vec_size)++; + buf++; + channel->send_data.not_sent_buf_head++; + } while (((*vec_size) < MAX_SEND_VEC) && + (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs)); +} + +static void red_channel_send(RedChannel *channel) +{ + red_peer_handle_outgoing(channel->peer, &channel->outgoing); +} + +void red_channel_begin_send_massage(RedChannel *channel) +{ + channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader); + channel->ack_data.messages_window++; + red_channel_send(channel); +} + +static void red_channel_push(RedChannel *channel) +{ + PipeItem *pipe_item; + + if (!channel->during_send) { + channel->during_send = TRUE; + } else { + return; + } + + if (channel->send_data.blocked) { + red_channel_send(channel); + } + + while ((pipe_item = red_channel_pipe_get(channel))) { + channel->send_item(channel, pipe_item); + } + channel->during_send = FALSE; +} + +static void red_channel_opaque_push(void *data) +{ + red_channel_push((RedChannel *)data); +} + +uint64_t red_channel_get_message_serial(RedChannel *channel) +{ + return channel->send_data.header.serial; +} + +void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type) +{ + ring_item_init(&item->link); + item->type = type; +} + +void red_channel_pipe_add(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->pipe_size++; + ring_add(&channel->pipe, &item->link); + + red_channel_push(channel); +} + +int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item) +{ + return ring_item_is_linked(&item->link); +} + +void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item) +{ + ring_remove(&item->link); +} + +void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + channel->pipe_size++; + ring_add_before(&item->link, &channel->pipe); + + red_channel_push(channel); +} + +void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type) +{ + PipeItem *item = malloc(sizeof(*item)); + if (!item) { + red_error("malloc failed"); + } + red_channel_pipe_item_init(channel, item, pipe_item_type); + red_channel_pipe_add(channel, item); + + red_channel_push(channel); +} + +static PipeItem *red_channel_pipe_get(RedChannel *channel) +{ + PipeItem *item; + + if (!channel || channel->send_data.blocked || + (channel->handle_acks && (channel->ack_data.messages_window > CLIENT_ACK_WINDOW * 2)) || + !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { + return NULL; + } + + --channel->pipe_size; + ring_remove(&item->link); + return item; +} + +static void red_channel_pipe_clear(RedChannel *channel) +{ + PipeItem *item; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + } + + while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { + ring_remove(&item->link); + channel->release_item(channel, item, FALSE); + } +} + diff --git a/server/red_channel.h b/server/red_channel.h new file mode 100644 index 0000000..1096ba7 --- /dev/null +++ b/server/red_channel.h @@ -0,0 +1,182 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_RED_CHANNEL +#define _H_RED_CHANNEL + +#include "red_common.h" +#include "reds.h" +#include "vd_interface.h" +#include "ring.h" + +#define MAX_SEND_BUFS 1000 +#define MAX_SEND_VEC 50 +#define CLIENT_ACK_WINDOW 20 + +/* Basic interface for channels, without using the RedChannel interface. + The intention is to move towards one channel interface gradually. + At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */ + +typedef int (*handle_message_proc)(void *opaque, + RedDataHeader *header, uint8_t *msg); +typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, RedDataHeader *msg_header); +typedef void (*release_msg_recv_buf_proc)(void *opaque, + RedDataHeader *msg_header, uint8_t *msg); +typedef void (*on_incoming_error_proc)(void *opaque); + +typedef struct IncomingHandler { + void *opaque; + RedDataHeader header; + uint32_t header_pos; + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. + uint32_t msg_pos; + handle_message_proc handle_message; + alloc_msg_recv_buf_proc alloc_msg_buf; + on_incoming_error_proc on_error; // recv error or handle_message error + release_msg_recv_buf_proc release_msg_buf; // for errors +} IncomingHandler; + +typedef int (*get_outgoing_msg_size_proc)(void *opaque); +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size); +typedef void (*on_outgoing_error_proc)(void *opaque); +typedef void (*on_outgoing_block_proc)(void *opaque); +typedef void (*on_outgoing_msg_done_proc)(void *opaque); +typedef struct OutgoingHandler { + void *opaque; + struct iovec vec_buf[MAX_SEND_VEC]; + int vec_size; + struct iovec *vec; + int pos; + int size; + get_outgoing_msg_size_proc get_msg_size; + prepare_outgoing_proc prepare; + on_outgoing_error_proc on_error; + on_outgoing_block_proc on_block; + on_outgoing_msg_done_proc on_msg_done; +} OutgoingHandler; + +/* Red Channel interface */ + +typedef struct BufDescriptor { + uint32_t size; + uint8_t *data; +} BufDescriptor; + +typedef struct PipeItem { + RingItem link; + int type; +} PipeItem; + +typedef struct RedChannel RedChannel; + +typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel, + RedDataHeader *msg_header); +typedef int (*channel_handle_message_proc)(RedChannel *channel, + RedDataHeader *header, uint8_t *msg); +typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel, + RedDataHeader *msg_header, uint8_t *msg); +typedef void (*channel_disconnect_proc)(RedChannel *channel); +typedef int (*channel_configure_socket_proc)(RedChannel *channel); +typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item); +typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, + PipeItem *item, int item_pushed); + +struct RedChannel { + RedsStreamContext *peer; + CoreInterface *core; + int migrate; + int handle_acks; + + struct { + uint32_t generation; + uint32_t client_generation; + uint32_t messages_window; + } ack_data; + + Ring pipe; + uint32_t pipe_size; + + struct { + RedDataHeader header; + union { + RedSetAck ack; + RedMigrate migrate; + } u; + uint32_t n_bufs; + BufDescriptor bufs[MAX_SEND_BUFS]; + uint32_t size; + uint32_t not_sent_buf_head; + + PipeItem *item; + int blocked; + } send_data; + + OutgoingHandler outgoing; + IncomingHandler incoming; + + channel_disconnect_proc disconnect; + channel_send_pipe_item_proc send_item; + channel_release_pipe_item_proc release_item; + + int during_send; +}; + +/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't + explicitly destroy the channel */ +RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core, + int migrate, int handle_acks, + channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, + channel_handle_message_proc handle_message, + channel_alloc_msg_recv_buf_proc alloc_recv_buf, + channel_release_msg_recv_buf_proc release_recv_buf, + channel_send_pipe_item_proc send_item, + channel_release_pipe_item_proc release_item); + +void red_channel_destroy(RedChannel *channel); + +void red_channel_shutdown(RedChannel *channel); +/* should be called when a new channel is ready to send messages */ +void red_channel_init_outgoing_messages_window(RedChannel *channel); + +/* handles general channel msgs from the client */ +int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg); + +/* when preparing send_data: should call reset, then init and then add_buf per buffer that is + being sent */ +void red_channel_reset_send_data(RedChannel *channel); +void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item); +void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size); + +uint64_t red_channel_get_message_serial(RedChannel *channel); + +/* when sending a msg. should first call red_channel_begin_send_massage */ +void red_channel_begin_send_massage(RedChannel *channel); + +void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type); +void red_channel_pipe_add(RedChannel *channel, PipeItem *item); +int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item); +void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item); +void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item); +/* for types that use this routine -> the pipe item should be freed */ +void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type); + +#endif diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c new file mode 100644 index 0000000..e4cb217 --- /dev/null +++ b/server/red_tunnel_worker.c @@ -0,0 +1,3510 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include <stdio.h> +#include <stdint.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include "red_tunnel_worker.h" +#include "red_common.h" +#include "red.h" +#include "reds.h" +#include "net_slirp.h" +#include "red_channel.h" + + +//#define DEBUG_NETWORK + +#ifdef DEBUG_NETWORK +#define PRINT_SCKT(sckt) red_printf("TUNNEL_DBG SOCKET(connection_id=%d port=%d, service=%d)",\ + sckt->connection_id, ntohs(sckt->local_port), \ + sckt->far_service->id) +#endif + +#define MAX_SOCKETS_NUM 20 + +#define MAX_SOCKET_DATA_SIZE (1024 * 2) + +#define SOCKET_WINDOW_SIZE 80 +#define SOCKET_TOKENS_TO_SEND 20 +#define SOCKET_TOKENS_TO_SEND_FOR_PROCESS 5 // sent in case the all the tokens were used by + // the client but they weren't consumed by slirp + // due to missing data for processing them and + // turning them into 'ready chunks' + +/* the number of buffer might exceed the window size when the analysis of the buffers in the + process queue need more data in order to be able to move them to the ready queue */ +#define MAX_SOCKET_IN_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5) +#define MAX_SOCKET_OUT_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5) + +#define CONTROL_MSG_RECV_BUF_SIZE 1024 + +typedef struct TunnelWorker TunnelWorker; + +enum { + PIPE_ITEM_TYPE_SET_ACK, + PIPE_ITEM_TYPE_MIGRATE, + PIPE_ITEM_TYPE_MIGRATE_DATA, + PIPE_ITEM_TYPE_TUNNEL_INIT, + PIPE_ITEM_TYPE_SERVICE_IP_MAP, + PIPE_ITEM_TYPE_SOCKET_OPEN, + PIPE_ITEM_TYPE_SOCKET_FIN, + PIPE_ITEM_TYPE_SOCKET_CLOSE, + PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK, + PIPE_ITEM_TYPE_SOCKET_DATA, + PIPE_ITEM_TYPE_SOCKET_TOKEN, +}; + +typedef struct RawTunneledBuffer RawTunneledBuffer; +typedef void (*release_tunneled_buffer_proc_t)(RawTunneledBuffer *buf); + +struct RawTunneledBuffer { + uint8_t *data; + int size; + int max_size; + int refs; + RawTunneledBuffer *next; + void *usr_opaque; + release_tunneled_buffer_proc_t release_proc; +}; + +static inline RawTunneledBuffer *tunneled_buffer_ref(RawTunneledBuffer *buf) +{ + buf->refs++; + return buf; +} + +static inline void tunneled_buffer_unref(RawTunneledBuffer *buf) +{ + if (!(--buf->refs)) { + buf->release_proc(buf); + } +} + +typedef struct RedSocket RedSocket; + +/* data received from the quest through slirp */ +typedef struct RedSocketRawSndBuf { + RawTunneledBuffer base; + uint8_t buf[MAX_SOCKET_DATA_SIZE]; +} RedSocketRawSndBuf; + +/* data received from the client */ +typedef struct RedSocketRawRcvBuf { + RawTunneledBuffer base; + uint8_t buf[MAX_SOCKET_DATA_SIZE + sizeof(RedcTunnelSocketData)]; + RedcTunnelSocketData *msg_info; +} RedSocketRawRcvBuf; + +typedef struct ReadyTunneledChunk ReadyTunneledChunk; + +enum { + READY_TUNNELED_CHUNK_TYPE_ORIG, + READY_TUNNELED_CHUNK_TYPE_SUB, // substitution +}; + + +/* A chunk of data from a RawTunneledBuffer (or a substitution for a part of it) + that was processed and is ready to be consumed (by slirp or by the client). + Each chunk has a reference to the RawTunneledBuffer it + was originated from. When all the reference chunks of one buffer are consumed (i.e. they are out + of the ready queue and they unrefed the buffer), the buffer is released */ +struct ReadyTunneledChunk { + uint32_t type; + RawTunneledBuffer *origin; + uint8_t *data; // if type == READY_TUNNELED_CHUNK_TYPE_ORIG, it points + // directly to the tunneled data. Otherwise, it is a + // newly allocated chunk of data + // that should be freed after its consumption. + int size; + ReadyTunneledChunk *next; +}; + +typedef struct ReadyTunneledChunkQueue { + ReadyTunneledChunk *head; + ReadyTunneledChunk *tail; + uint32_t offset; // first byte in the ready queue that wasn't consumed +} ReadyTunneledChunkQueue; + +static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin, + uint8_t *data, int size); +static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue); + + +enum { + PROCESS_DIRECTION_TYPE_REQUEST, // guest request + PROCESS_DIRECTION_TYPE_REPLY, // reply from the service in the client LAN +}; + +typedef struct TunneledBufferProcessQueue TunneledBufferProcessQueue; + +typedef RawTunneledBuffer *(*alloc_tunneled_buffer_proc_t)(TunneledBufferProcessQueue *queue); +/* processing the data. Notice that the buffers can be empty of + * data (see RedSocketRestoreTokensBuf) */ +typedef void (*analyze_new_data_proc_t)(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_buf, int offset, int len); + +// migrating specific queue data (not the buffers themselves) +typedef int (*get_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void **migrate_data); +typedef void (*release_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void *migrate_data); +typedef void (*restore_proc_t)(TunneledBufferProcessQueue *queue, uint8_t *migrate_data); + +struct TunneledBufferProcessQueue { + uint32_t service_type; // which kind of processing is performed. + uint32_t direction; // reply/request + RawTunneledBuffer *head; + RawTunneledBuffer *tail; + int head_offset; + + ReadyTunneledChunkQueue *ready_chunks_queue; // the queue to push the post-process data to + + void *usr_opaque; + + alloc_tunneled_buffer_proc_t alloc_buf_proc; // for appending data to the queue + analyze_new_data_proc_t analysis_proc; // service dependent. should create the + // post-process chunks and remove buffers + // from the queue. + get_migrate_data_proc_t get_migrate_data_proc; + release_migrate_data_proc_t release_migrate_data_proc; + restore_proc_t restore_proc; +}; + +/* push and append routines are the ones that call to the analysis_proc */ +static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf); +static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size); +static void process_queue_pop(TunneledBufferProcessQueue *queue); + +static void process_queue_clear(TunneledBufferProcessQueue *queue); + + +typedef struct RedSocketOutData { + // Note that this pipe items can appear only once in the pipe + PipeItem status_pipe_item; + PipeItem data_pipe_item; + PipeItem token_pipe_item; + + TunneledBufferProcessQueue *process_queue; // service type dependent + ReadyTunneledChunkQueue ready_chunks_queue; + ReadyTunneledChunk *push_tail; // last chunk in the ready queue that was pushed + uint32_t push_tail_size; // the subset of the push_tail that was sent + + uint32_t num_buffers; // total count of buffers in process_queue + references from ready queue + uint32_t data_size; // total size of data that is waiting to be sent. + + uint32_t num_tokens; + uint32_t window_size; +} RedSocketOutData; + +typedef struct RedSocketInData { + TunneledBufferProcessQueue *process_queue; // service type dependent + ReadyTunneledChunkQueue ready_chunks_queue; + + uint32_t num_buffers; + + int32_t num_tokens; // No. tokens conusmed by slirp since the last token msg sent to the + // client. can be negative if we loaned some to the client (when the + // ready queue is empty) + uint32_t client_total_num_tokens; +} RedSocketInData; + +typedef enum { + SLIRP_SCKT_STATUS_OPEN, + SLIRP_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from guest + SLIRP_SCKT_STATUS_SHUTDOWN_RECV, // Triggered when FIN is received from client + SLIRP_SCKT_STATUS_DELAY_ABORT, // when out buffers overflow, we wait for client to + // close before we close slirp socket. see + //tunnel_socket_force_close + SLIRP_SCKT_STATUS_WAIT_CLOSE, // when shutdown_send was called after shut_recv + // and vice versa + SLIRP_SCKT_STATUS_CLOSED, +} SlirpSocketStatus; + +typedef enum { + CLIENT_SCKT_STATUS_WAIT_OPEN, + CLIENT_SCKT_STATUS_OPEN, + CLIENT_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from client + CLIENT_SCKT_STATUS_CLOSED, +} ClientSocketStatus; + +typedef struct TunnelService TunnelService; +struct RedSocket { + int allocated; + + TunnelWorker *worker; + + uint16_t connection_id; + + uint16_t local_port; + TunnelService *far_service; + + ClientSocketStatus client_status; + SlirpSocketStatus slirp_status; + + int pushed_close; + int client_waits_close_ack; + + SlirpSocket *slirp_sckt; + + RedSocketOutData out_data; + RedSocketInData in_data; + + int in_slirp_send; + + uint32_t mig_client_status_msg; // the last status change msg that was received from + //the client during migration, and thus was unhandled. + // It is 0 if the status didn't change during migration + uint32_t mig_open_ack_tokens; // if REDC_TUNNEL_SOCKET_OPEN_ACK was received during + // migration, we store the tokens we received in the + // msg. +}; + +/********** managing send buffers ***********/ +static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt); +static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker); +static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer( + TunneledBufferProcessQueue *queue); + +static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf); +static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker, + RedSocketRawSndBuf *snd_buf); +static void snd_tunnled_buffer_release(RawTunneledBuffer *buf); + +/********** managing recv buffers ***********/ +// receive buffers are allocated before we know to which socket they are directed. +static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt, + RedSocketRawRcvBuf *recv_buf, int buf_size); +static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker); + +static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf); +static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker, + RedSocketRawRcvBuf *rcv_buf); +static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf); + +/********* managing buffers' queues ***********/ + +static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_last_added, + int offset, int len); +static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue( + RedSocket *sckt, + uint32_t service_type, + uint32_t direction_type); +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue( + RedSocket *sckt); +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue( + RedSocket *sckt); +static void free_simple_process_queue(TunneledBufferProcessQueue *queue); + +typedef struct ServiceCallback { + /* allocating the the queue & setting the analysis proc by service type */ + TunneledBufferProcessQueue *(*alloc_process_queue)(RedSocket * sckt); + void (*free_process_queue)(TunneledBufferProcessQueue *queue); +} ServiceCallback; + +/* Callbacks for process queue manipulation according to the service type and + the direction of the data. + The access is performed by [service_type][direction] */ +static const ServiceCallback SERVICES_CALLBACKS[3][2] = { + {{NULL, NULL}, + {NULL, NULL}}, + {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue}, + {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}}, + {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue}, + {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}} +}; + +/**************************************************** +* Migration data +****************************************************/ +typedef struct TunnelChannel TunnelChannel; + +#define TUNNEL_MIGRATE_DATA_MAGIC (*(uint32_t *)"TMDA") +#define TUNNEL_MIGRATE_DATA_VERSION 1 + +#define TUNNEL_MIGRATE_NULL_OFFSET = ~0; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketOutData { + uint32_t num_tokens; + uint32_t window_size; + + uint32_t process_buf_size; + uint32_t process_buf; + + uint32_t process_queue_size; + uint32_t process_queue; + + uint32_t ready_buf_size; + uint32_t ready_buf; +} TunnelMigrateSocketOutData; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketInData { + int32_t num_tokens; + uint32_t client_total_num_tokens; + + uint32_t process_buf_size; + uint32_t process_buf; + + uint32_t process_queue_size; + uint32_t process_queue; + + uint32_t ready_buf_size; + uint32_t ready_buf; +} TunnelMigrateSocketInData; + + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocket { + uint16_t connection_id; + uint16_t local_port; + uint32_t far_service_id; + + uint16_t client_status; + uint16_t slirp_status; + + uint8_t pushed_close; + uint8_t client_waits_close_ack; + + TunnelMigrateSocketOutData out_data; + TunnelMigrateSocketInData in_data; + + uint32_t slirp_sckt; + + uint32_t mig_client_status_msg; + uint32_t mig_open_ack_tokens; +} TunnelMigrateSocket; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketList { + uint16_t num_sockets; + uint32_t sockets[0]; // offsets in TunnelMigrateData.data to TunnelMigrateSocket +} TunnelMigrateSocketList; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateService { + uint32_t type; + uint32_t id; + uint32_t group; + uint32_t port; + uint32_t name; + uint32_t description; + uint8_t virt_ip[4]; +} TunnelMigrateService; + +typedef struct __attribute__ ((__packed__)) TunnelMigratePrintService { + TunnelMigrateService base; + uint8_t ip[4]; +} TunnelMigratePrintService; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateServicesList { + uint32_t num_services; + uint32_t services[0]; +} TunnelMigrateServicesList; + +//todo: add ack_generation +typedef struct __attribute__ ((__packed__)) TunnelMigrateData { + uint32_t magic; + uint32_t version; + uint64_t message_serial; + + uint32_t slirp_state; // offset in data to slirp state + uint32_t sockets_list; // offset in data to TunnelMigrateSocketList + uint32_t services_list; + + uint8_t data[0]; +} TunnelMigrateData; + +typedef struct TunnelMigrateSocketItem { + RedSocket *socket; + TunnelMigrateSocket mig_socket; + void *out_process_queue; + void *in_process_queue; // queue data specific for service + void *slirp_socket; + uint32_t slirp_socket_size; +} TunnelMigrateSocketItem; + +typedef struct TunnelMigrateServiceItem { + TunnelService *service; + union { + TunnelMigrateService generic_service; + TunnelMigratePrintService print_service; + } u; +} TunnelMigrateServiceItem; + +typedef struct TunnelMigrateItem { + PipeItem base; + + void *slirp_state; + uint64_t slirp_state_size; + + TunnelMigrateServicesList *services_list; + uint32_t services_list_size; + + TunnelMigrateServiceItem *services; + + TunnelMigrateSocketList *sockets_list; + uint32_t sockets_list_size; + + TunnelMigrateSocketItem sockets_data[MAX_SOCKETS_NUM]; +} TunnelMigrateItem; + +static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel); + +/*******************************************************************************************/ + +/* use for signaling that 1) subroutines failed 2)routines in the interface for slirp + failed (which triggred from a call to slirp) */ +#define SET_TUNNEL_ERROR(channel,format, ...) { \ + channel->tunnel_error = TRUE; \ + red_printf(format, ## __VA_ARGS__); \ +} + +/* should be checked after each subroutine that may cause error or fter calls to slirp routines */ +#define CHECK_TUNNEL_ERROR(channel) (channel->tunnel_error) + +struct TunnelChannel { + RedChannel base; + TunnelWorker *worker; + int mig_inprogress; + int expect_migrate_mark; + int expect_migrate_data; + + int tunnel_error; + + struct { + union { + RedTunnelInit init; + RedTunnelServiceIpMap service_ip; + RedTunnelSocketOpen socket_open; + RedTunnelSocketFin socket_fin; + RedTunnelSocketClose socket_close; + RedTunnelSocketClosedAck socket_close_ack; + RedTunnelSocketData socket_data; + RedTunnelSocketTokens socket_token; + TunnelMigrateData migrate_data; + } u; + } send_data; + + uint8_t control_rcv_buf[CONTROL_MSG_RECV_BUF_SIZE]; +}; + +typedef struct RedSlirpNetworkInterface { + SlirpUsrNetworkInterface base; + TunnelWorker *worker; +} RedSlirpNetworkInterface; + +struct TunnelService { + RingItem ring_item; + PipeItem pipe_item; + uint32_t type; + uint32_t id; + uint32_t group; + uint32_t port; + char *name; + char *description; + + struct in_addr virt_ip; +}; + +typedef struct TunnelPrintService { + TunnelService base; + uint8_t ip[4]; +} TunnelPrintService; + +struct TunnelWorker { + Channel channel_interface; // for reds + TunnelChannel *channel; + + CoreInterface *core_interface; + NetWireInterface *vlan_interface; + RedSlirpNetworkInterface tunnel_interface; + RedSlirpNetworkInterface null_interface; + + RedSocket sockets[MAX_SOCKETS_NUM]; // the sockets are in the worker and not + // in the channel since the slirp sockets + // can be still alive (but during close) after + // the channel was disconnected + + int num_sockets; + + RedSocketRawSndBuf *free_snd_buf; + RedSocketRawRcvBuf *free_rcv_buf; + + Ring services; + int num_services; +}; + + +/********************************************************************* + * Tunnel interface + *********************************************************************/ +static void tunnel_channel_disconnect(RedChannel *channel); + +/* networking interface for slirp */ +static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface); +static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len); +static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s); +static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s); +static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque); +static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque); +static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent); +static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent); +static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len); +static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len); +static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); + +static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface, + timer_proc_t proc, void *opaque); +static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms); + + +/* reds interface */ +static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps); +static void handle_tunnel_channel_shutdown(struct Channel *channel); +static void handle_tunnel_channel_migrate(struct Channel *channel); + + +static void tunnel_shutdown(TunnelWorker *worker) +{ + int i; + red_printf(""); + /* shutdown input from channel */ + if (worker->channel) { + red_channel_shutdown(&worker->channel->base); + } + + /* shutdown socket pipe items */ + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + RedSocket *sckt = worker->sockets + i; + if (sckt->allocated) { + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + sckt->client_waits_close_ack = FALSE; + } + } + + /* shutdown input from slirp */ + net_slirp_set_net_interface(&worker->null_interface.base); +} + +/***************************************************************** +* Managing raw tunneled buffers storage +******************************************************************/ + +/********** send buffers ***********/ +static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt) +{ + RedSocketRawSndBuf *ret = __tunnel_worker_alloc_socket_snd_buf(sckt->worker); + ret->base.usr_opaque = sckt; + ret->base.release_proc = snd_tunnled_buffer_release; + sckt->out_data.num_buffers++; + return &ret->base; +} + +static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker) +{ + RedSocketRawSndBuf *ret; + if (worker->free_snd_buf) { + ret = worker->free_snd_buf; + worker->free_snd_buf = (RedSocketRawSndBuf *)worker->free_snd_buf->base.next; + } else { + ret = (RedSocketRawSndBuf *)malloc(sizeof(*ret)); + if (!ret) { + red_error("malloc of send buf failed"); + } + } + ret->base.data = ret->buf; + ret->base.size = 0; + ret->base.max_size = MAX_SOCKET_DATA_SIZE; + ret->base.usr_opaque = NULL; + ret->base.refs = 1; + ret->base.next = NULL; + + return ret; +} + +static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf) +{ + sckt->out_data.num_buffers--; + __tunnel_worker_free_socket_snd_buf(sckt->worker, snd_buf); +} + +static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker, + RedSocketRawSndBuf *snd_buf) +{ + snd_buf->base.size = 0; + snd_buf->base.next = &worker->free_snd_buf->base; + worker->free_snd_buf = snd_buf; +} + +static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer(TunneledBufferProcessQueue *queue) +{ + return tunnel_socket_alloc_snd_buf((RedSocket *)queue->usr_opaque); +} + +static void snd_tunnled_buffer_release(RawTunneledBuffer *buf) +{ + tunnel_socket_free_snd_buf((RedSocket *)buf->usr_opaque, (RedSocketRawSndBuf *)buf); +} + +/********** recv buffers ***********/ + +static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt, + RedSocketRawRcvBuf *recv_buf, int buf_size) +{ + ASSERT(!recv_buf->base.usr_opaque); + // the rcv buffer was allocated by tunnel_channel_alloc_msg_rcv_buf + // before we could know which of the sockets it belongs to, so the + // assignment to the socket is performed now + recv_buf->base.size = buf_size; + recv_buf->base.usr_opaque = sckt; + recv_buf->base.release_proc = rcv_tunnled_buffer_release; + sckt->in_data.num_buffers++; + process_queue_push(sckt->in_data.process_queue, &recv_buf->base); +} + +static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker) +{ + RedSocketRawRcvBuf *ret; + if (worker->free_rcv_buf) { + ret = worker->free_rcv_buf; + worker->free_rcv_buf = (RedSocketRawRcvBuf *)worker->free_rcv_buf->base.next; + } else { + ret = (RedSocketRawRcvBuf *)malloc(sizeof(*ret)); + if (!ret) { + red_error("malloc of send buf failed"); + } + } + ret->msg_info = (RedcTunnelSocketData *)ret->buf; + ret->base.usr_opaque = NULL; + ret->base.data = ret->msg_info->data; + ret->base.size = 0; + ret->base.max_size = MAX_SOCKET_DATA_SIZE; + ret->base.refs = 1; + ret->base.next = NULL; + + return ret; +} + +static inline void __process_rcv_buf_tokens(TunnelChannel *channel, RedSocket *sckt) +{ + if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) || red_channel_pipe_item_is_linked( + &channel->base, &sckt->out_data.token_pipe_item) || channel->mig_inprogress) { + return; + } + + if ((sckt->in_data.num_tokens >= SOCKET_TOKENS_TO_SEND) || + (!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head)) { + sckt->out_data.token_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_TOKEN; + red_channel_pipe_add(&channel->base, &sckt->out_data.token_pipe_item); + } +} + +static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf) +{ + --sckt->in_data.num_buffers; + __tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf); + ++sckt->in_data.num_tokens; + __process_rcv_buf_tokens(sckt->worker->channel, sckt); +} + +static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker, + RedSocketRawRcvBuf *rcv_buf) +{ + rcv_buf->base.next = &worker->free_rcv_buf->base; + worker->free_rcv_buf = rcv_buf; +} + +static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf) +{ + tunnel_socket_free_rcv_buf((RedSocket *)buf->usr_opaque, + (RedSocketRawRcvBuf *)buf); +} + +/************************ +* Process & Ready queue +*************************/ + +static inline void __process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf) +{ + buf->next = NULL; + if (!queue->head) { + queue->head = buf; + queue->tail = buf; + } else { + queue->tail->next = buf; + queue->tail = buf; + } +} + +static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf) +{ + __process_queue_push(queue, buf); + queue->analysis_proc(queue, buf, 0, buf->size); +} + +static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size) +{ + RawTunneledBuffer *start_buf = NULL; + int start_offset = 0; + int copied = 0; + + if (queue->tail) { + RawTunneledBuffer *buf = queue->tail; + int space = buf->max_size - buf->size; + if (space) { + int copy_count = MIN(size, space); + start_buf = buf; + start_offset = buf->size; + memcpy(buf->data + buf->size, data, copy_count); + copied += copy_count; + buf->size += copy_count; + } + } + + + while (copied < size) { + RawTunneledBuffer *buf = queue->alloc_buf_proc(queue); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + + __process_queue_push(queue, buf); + + if (!start_buf) { + start_buf = buf; + start_offset = 0; + } + } + + queue->analysis_proc(queue, start_buf, start_offset, size); +} + +static void process_queue_pop(TunneledBufferProcessQueue *queue) +{ + RawTunneledBuffer *prev_head; + ASSERT(queue->head && queue->tail); + prev_head = queue->head; + queue->head = queue->head->next; + if (!queue->head) { + queue->tail = NULL; + } + + tunneled_buffer_unref(prev_head); +} + +static void process_queue_clear(TunneledBufferProcessQueue *queue) +{ + while (queue->head) { + process_queue_pop(queue); + } +} + +static void __ready_queue_push(ReadyTunneledChunkQueue *queue, ReadyTunneledChunk *chunk) +{ + chunk->next = NULL; + if (queue->tail) { + queue->tail->next = chunk; + queue->tail = chunk; + } else { + queue->head = chunk; + queue->tail = chunk; + } +} + +static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin, + uint8_t *data, int size) +{ + ReadyTunneledChunk *chunk = malloc(sizeof(ReadyTunneledChunk)); + chunk->type = READY_TUNNELED_CHUNK_TYPE_ORIG; + chunk->origin = tunneled_buffer_ref(origin); + chunk->data = data; + chunk->size = size; + + __ready_queue_push(queue, chunk); +} + +static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue) +{ + ReadyTunneledChunk *chunk = queue->head; + ASSERT(queue->head); + queue->head = queue->head->next; + + if (!queue->head) { + queue->tail = NULL; + } + + tunneled_buffer_unref(chunk->origin); + if (chunk->type != READY_TUNNELED_CHUNK_TYPE_ORIG) { + free(chunk->data); + } + free(chunk); +} + +static void ready_queue_clear(ReadyTunneledChunkQueue *queue) +{ + while (queue->head) { + ready_queue_pop_chunk(queue); + } +} + +static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_last_added, int offset, int len) +{ + ASSERT(offset == 0); + ASSERT(start_last_added == queue->head); + + while (queue->head) { + ready_queue_add_orig_chunk(queue->ready_chunks_queue, queue->head, queue->head->data, + queue->head->size); + process_queue_pop(queue); + } +} + +static int process_queue_simple_get_migrate_data(TunneledBufferProcessQueue *queue, + void **migrate_data) +{ + *migrate_data = NULL; + return 0; +} + +static void process_queue_simple_release_migrate_data(TunneledBufferProcessQueue *queue, + void *migrate_data) +{ + ASSERT(!migrate_data); +} + +static void process_queue_simple_restore(TunneledBufferProcessQueue *queue, uint8_t *migrate_data) +{ +} + +static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue( + RedSocket *sckt, + uint32_t service_type, + uint32_t direction_type) +{ + TunneledBufferProcessQueue *ret_queue = malloc(sizeof(TunneledBufferProcessQueue)); + memset(ret_queue, 0, sizeof(TunneledBufferProcessQueue)); + ret_queue->service_type = service_type; + ret_queue->direction = direction_type; + ret_queue->usr_opaque = sckt; + // NO need for allocations by the process queue when getting replies. The buffer is created + // when the msg is received + if (direction_type == PROCESS_DIRECTION_TYPE_REQUEST) { + ret_queue->alloc_buf_proc = process_queue_alloc_snd_tunneled_buffer; + ret_queue->ready_chunks_queue = &sckt->out_data.ready_chunks_queue; + } else { + ret_queue->ready_chunks_queue = &sckt->in_data.ready_chunks_queue; + } + + ret_queue->analysis_proc = process_queue_simple_analysis; + + ret_queue->get_migrate_data_proc = process_queue_simple_get_migrate_data; + ret_queue->release_migrate_data_proc = process_queue_simple_release_migrate_data; + ret_queue->restore_proc = process_queue_simple_restore; + return ret_queue; +} + +static void free_simple_process_queue(TunneledBufferProcessQueue *queue) +{ + process_queue_clear(queue); + free(queue); +} + +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue( + RedSocket *sckt) +{ + return __tunnel_socket_alloc_simple_process_queue(sckt, + RED_TUNNEL_SERVICE_TYPE_IPP, + PROCESS_DIRECTION_TYPE_REQUEST); +} + +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue( + RedSocket *sckt) +{ + return __tunnel_socket_alloc_simple_process_queue(sckt, + RED_TUNNEL_SERVICE_TYPE_IPP, + PROCESS_DIRECTION_TYPE_REPLY); +} + +static void tunnel_send_packet(void *opaque_tunnel, const uint8_t *pkt, int pkt_len) +{ + TunnelWorker *worker = (TunnelWorker *)opaque_tunnel; + ASSERT(worker); + + if (worker->channel && worker->channel->base.migrate) { + return; // during migration and the tunnel state hasn't been restored yet. + } + + net_slirp_input(pkt, pkt_len); +} + +void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface) +{ + TunnelWorker *worker = (TunnelWorker *)malloc(sizeof(TunnelWorker)); + + if (!worker) { + red_error("malloc of tunnel worker failed"); + } + memset(worker, 0, sizeof(*worker)); + + worker->core_interface = core_interface; + worker->vlan_interface = vlan_interface; + + worker->tunnel_interface.base.slirp_can_output = qemu_can_output; + worker->tunnel_interface.base.slirp_output = qemu_output; + worker->tunnel_interface.base.connect = tunnel_socket_connect; + worker->tunnel_interface.base.send = tunnel_socket_send; + worker->tunnel_interface.base.recv = tunnel_socket_recv; + worker->tunnel_interface.base.close = tunnel_socket_close; + worker->tunnel_interface.base.shutdown_recv = tunnel_socket_shutdown_recv; + worker->tunnel_interface.base.shutdown_send = tunnel_socket_shutdown_send; + worker->tunnel_interface.base.create_timer = create_timer; + worker->tunnel_interface.base.arm_timer = arm_timer; + + worker->tunnel_interface.worker = worker; + + worker->null_interface.base.slirp_can_output = qemu_can_output; + worker->null_interface.base.slirp_output = qemu_output; + worker->null_interface.base.connect = null_tunnel_socket_connect; + worker->null_interface.base.send = null_tunnel_socket_send; + worker->null_interface.base.recv = null_tunnel_socket_recv; + worker->null_interface.base.close = null_tunnel_socket_close; + worker->null_interface.base.shutdown_recv = null_tunnel_socket_shutdown_recv; + worker->null_interface.base.shutdown_send = null_tunnel_socket_shutdown_send; + worker->null_interface.base.create_timer = create_timer; + worker->null_interface.base.arm_timer = arm_timer; + + worker->null_interface.worker = worker; + + worker->channel_interface.type = RED_CHANNEL_TUNNEL; + worker->channel_interface.id = 0; + worker->channel_interface.link = handle_tunnel_channel_link; + worker->channel_interface.shutdown = handle_tunnel_channel_shutdown; + worker->channel_interface.migrate = handle_tunnel_channel_migrate; + worker->channel_interface.data = worker; + + ring_init(&worker->services); + reds_register_channel(&worker->channel_interface); + + net_slirp_init(worker->vlan_interface->get_ip(worker->vlan_interface), + TRUE, + &worker->null_interface.base); + if (!vlan_interface->register_route_packet(vlan_interface, tunnel_send_packet, worker)) { + red_error("register route packet failed"); + } + return worker; +} + +/* returns the first service that has the same group id (NULL if not found) */ +static inline TunnelService *__tunnel_worker_find_service_of_group(TunnelWorker *worker, + uint32_t group) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if (service->group == group) { + return service; + } + } + + return NULL; +} + +static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, uint32_t size, + uint32_t type, uint32_t id, + uint32_t group, uint32_t port, + char *name, char *description, + struct in_addr *virt_ip) +{ + TunnelService *new_service = malloc(size); + + if (!new_service) { + red_error("malloc of TunnelService failed"); + } + memset(new_service, 0, size); + + if (!virt_ip) { + TunnelService *service_of_same_group; + if (!(service_of_same_group = __tunnel_worker_find_service_of_group(worker, group))) { + if (!net_slirp_allocate_virtual_ip(&new_service->virt_ip)) { + red_printf("failed to allocate virtual ip"); + free(new_service); + return NULL; + } + } else { + if (strcmp(name, service_of_same_group->name) == 0) { + new_service->virt_ip.s_addr = service_of_same_group->virt_ip.s_addr; + } else { + red_printf("inconsistent name for service group %d", group); + free(new_service); + return NULL; + } + } + } else { + new_service->virt_ip.s_addr = virt_ip->s_addr; + } + + ring_item_init(&new_service->ring_item); + new_service->type = type; + new_service->id = id; + new_service->group = group; + new_service->port = port; + + new_service->name = strdup(name); + new_service->description = strdup(description); + + ring_add(&worker->services, &new_service->ring_item); + worker->num_services++; + +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: ==>SERVICE ADDED: id=%d virt ip=%s port=%d name=%s desc=%s", + new_service->id, inet_ntoa(new_service->virt_ip), + new_service->port, new_service->name, new_service->description); +#endif + if (!virt_ip) { + new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP; + red_channel_pipe_add(&worker->channel->base, &new_service->pipe_item); + } + + return new_service; +} + +static TunnelService *tunnel_worker_add_service(TunnelWorker *worker, uint32_t size, + RedcTunnelAddGenericService *redc_service) +{ + return __tunnel_worker_add_service(worker, size, redc_service->type, + redc_service->id, redc_service->group, + redc_service->port, + (char *)(((uint8_t *)redc_service) + + redc_service->name), + (char *)(((uint8_t *)redc_service) + + redc_service->description), NULL); +} + +static inline void tunnel_worker_free_service(TunnelWorker *worker, TunnelService *service) +{ + ring_remove(&service->ring_item); + free(service->name); + free(service->description); + free(service); + worker->num_services--; +} + +static void tunnel_worker_free_print_service(TunnelWorker *worker, TunnelPrintService *service) +{ + tunnel_worker_free_service(worker, &service->base); +} + +static TunnelPrintService *tunnel_worker_add_print_service(TunnelWorker *worker, + RedcTunnelAddPrintService *redc_service) +{ + TunnelPrintService *service; + + service = (TunnelPrintService *)tunnel_worker_add_service(worker, sizeof(TunnelPrintService), + &redc_service->base); + + if (!service) { + return NULL; + } + + if (redc_service->ip.type == RED_TUNNEL_IP_TYPE_IPv4) { + memcpy(service->ip, redc_service->ip.data, sizeof(RedTunnelIPv4)); + } else { + red_printf("unexpected ip type=%d", redc_service->ip.type); + tunnel_worker_free_print_service(worker, service); + return NULL; + } +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: ==>PRINT SERVICE ADDED: ip=%d.%d.%d.%d", service->ip[0], + service->ip[1], service->ip[2], service->ip[3]); +#endif + return service; +} + +static int tunnel_channel_handle_service_add(TunnelChannel *channel, + RedcTunnelAddGenericService *service_msg) +{ + TunnelService *out_service = NULL; + if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + out_service = &tunnel_worker_add_print_service(channel->worker, + (RedcTunnelAddPrintService *) + service_msg)->base; + } else if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + out_service = tunnel_worker_add_service(channel->worker, sizeof(TunnelService), + service_msg); + } else { + red_printf("invalid service type"); + } + + free(service_msg); + return (out_service != NULL); +} + +static inline TunnelService *tunnel_worker_find_service_by_id(TunnelWorker *worker, uint32_t id) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if (service->id == id) { + return service; + } + } + + return NULL; +} + +static inline TunnelService *tunnel_worker_find_service_by_addr(TunnelWorker *worker, + struct in_addr *virt_ip, + uint32_t port) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if ((virt_ip->s_addr == service->virt_ip.s_addr) && (port == service->port)) { + return service; + } + } + + return NULL; +} + +static inline void tunnel_worker_clear_routed_network(TunnelWorker *worker) +{ + while (!ring_is_empty(&worker->services)) { + TunnelService *service = (TunnelService *)ring_get_head(&worker->services); + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + tunnel_worker_free_service(worker, service); + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + tunnel_worker_free_print_service(worker, (TunnelPrintService *)service); + } else { + red_error("unexpected service type"); + } + } + + net_slirp_clear_virtual_ips(); +} + +static inline RedSocket *__tunnel_worker_find_free_socket(TunnelWorker *worker) +{ + int i; + RedSocket *ret = NULL; + + if (worker->num_sockets == MAX_SOCKETS_NUM) { + return NULL; + } + + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + if (!worker->sockets[i].allocated) { + ret = worker->sockets + i; + ret->connection_id = i; + break; + } + } + + ASSERT(ret); + return ret; +} + +static inline void __tunnel_worker_add_socket(TunnelWorker *worker, RedSocket *sckt) +{ + ASSERT(!sckt->allocated); + sckt->allocated = TRUE; + worker->num_sockets++; +} + +static inline void tunnel_worker_alloc_socket(TunnelWorker *worker, RedSocket *sckt, + uint16_t local_port, TunnelService *far_service, + SlirpSocket *slirp_s) +{ + ASSERT(far_service); + sckt->worker = worker; + sckt->local_port = local_port; + sckt->far_service = far_service; + sckt->out_data.num_tokens = 0; + + sckt->slirp_status = SLIRP_SCKT_STATUS_OPEN; + sckt->client_status = CLIENT_SCKT_STATUS_WAIT_OPEN; + sckt->slirp_sckt = slirp_s; + + sckt->out_data.process_queue = SERVICES_CALLBACKS[far_service->type][ + PROCESS_DIRECTION_TYPE_REQUEST].alloc_process_queue(sckt); + sckt->in_data.process_queue = SERVICES_CALLBACKS[far_service->type][ + PROCESS_DIRECTION_TYPE_REPLY].alloc_process_queue(sckt); + __tunnel_worker_add_socket(worker, sckt); +} + +static inline void __tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt) +{ + memset(sckt, 0, sizeof(*sckt)); + worker->num_sockets--; +} + +static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t local_port, + TunnelService *far_service, + SlirpSocket *slirp_s) +{ + RedSocket *new_socket; + ASSERT(worker); + new_socket = __tunnel_worker_find_free_socket(worker); + + if (!new_socket) { + red_error("malloc of RedSocket failed"); + } + + tunnel_worker_alloc_socket(worker, new_socket, local_port, far_service, slirp_s); + + return new_socket; +} + +static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt) +{ + if (worker->channel) { + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.data_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.data_pipe_item); + return; + } + + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.status_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.status_pipe_item); + return; + } + + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.token_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.token_pipe_item); + return; + } + } + + SERVICES_CALLBACKS[sckt->far_service->type][ + PROCESS_DIRECTION_TYPE_REQUEST].free_process_queue(sckt->out_data.process_queue); + SERVICES_CALLBACKS[sckt->far_service->type][ + PROCESS_DIRECTION_TYPE_REPLY].free_process_queue(sckt->in_data.process_queue); + + ready_queue_clear(&sckt->out_data.ready_chunks_queue); + ready_queue_clear(&sckt->in_data.ready_chunks_queue); + + __tunnel_worker_free_socket(worker, sckt); +} + +static inline RedSocket *tunnel_worker_find_socket(TunnelWorker *worker, + uint16_t local_port, + uint32_t far_service_id) +{ + RedSocket *sckt; + int allocated = 0; + + for (sckt = worker->sockets; allocated < worker->num_sockets; sckt++) { + if (sckt->allocated) { + allocated++; + if ((sckt->local_port == local_port) && + (sckt->far_service->id == far_service_id)) { + return sckt; + } + } + } + return NULL; +} + +static inline void __tunnel_socket_add_fin_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)); + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_FIN; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +static inline void __tunnel_socket_add_close_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!channel->mig_inprogress); + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) { + ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN); + // close is stronger than FIN + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item); + } + sckt->pushed_close = TRUE; + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSE; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +static inline void __tunnel_socket_add_close_ack_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!channel->mig_inprogress); + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) { + ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN); + // close is stronger than FIN + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item); + } + + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +/* + Send close msg to the client. + If possible, notify slirp to recv data (which will return 0) + When close ack is received from client, we notify slirp (maybe again) if needed. +*/ +static void tunnel_socket_force_close(TunnelChannel *channel, RedSocket *sckt) +{ + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.token_pipe_item)) { + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.token_pipe_item); + } + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) { + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.data_pipe_item); + } + + + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) || + !sckt->pushed_close) { + __tunnel_socket_add_close_to_pipe(channel, sckt); + } + + // we can't call net_slirp_socket_can_receive_notify if the forced close was initiated by + // tunnel_socket_send (which was called from slirp). Instead, when + // we receive the close ack from the client, we call net_slirp_socket_can_receive_notify + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + if (!sckt->in_slirp_send) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } else { + sckt->slirp_status = SLIRP_SCKT_STATUS_DELAY_ABORT; + } + } +} + +static int tunnel_channel_handle_socket_connect_ack(TunnelChannel *channel, RedSocket *sckt, + uint32_t tokens) +{ +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG"); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_ACK; + sckt->mig_open_ack_tokens = tokens; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_ACK status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_OPEN; + + // SLIRP_SCKT_STATUS_CLOSED is possible after waiting for a connection has timed out + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + ASSERT(!sckt->pushed_close); + __tunnel_socket_add_close_to_pipe(channel, sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->out_data.window_size = tokens; + sckt->out_data.num_tokens = tokens; + net_slirp_socket_connected_notify(sckt->slirp_sckt); + } else { + red_printf("unexpected slirp status status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_connect_nack(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_NACK; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_NACK status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + net_slirp_socket_connect_failed_notify(sckt->slirp_sckt); + } else { + tunnel_worker_free_socket(channel->worker, sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_fin(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_FIN; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_FIN status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_SHUTDOWN_SEND; + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT)) { + return TRUE; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + // After slirp will receive all the data buffers, the next recv + // will return an error and shutdown_recv should be called. + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) { + // it already received the FIN + red_printf("unexpected slirp status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_closed(TunnelChannel *channel, RedSocket *sckt) +{ + int prev_client_status = sckt->client_status; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED; + return TRUE; + } + + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + // if we already pushed close to the client, we expect it to send us ack. + // Otherwise, we will send it an ack. + if (!sckt->pushed_close) { + sckt->client_waits_close_ack = TRUE; + __tunnel_socket_add_close_ack_to_pipe(channel, sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); + } + + // close was initiated by client + sckt->client_waits_close_ack = TRUE; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) { + // guest waits for fin: after slirp will receive all the data buffers, + // the next recv will return an error and shutdown_recv should be called. + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } else if ((sckt->slirp_status != SLIRP_SCKT_STATUS_WAIT_CLOSE) || + (prev_client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // slirp can be in wait close if both slirp and client sent fin perviously + // otherwise, the prev client status would also have been wait close, and this + // case was handled above + red_printf("unexpected slirp_status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_closed_ack(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED_ACK; + return TRUE; + } + + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + return (!CHECK_TUNNEL_ERROR(channel)); + } + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + red_printf("unexcpected REDC_TUNNEL_SOCKET_CLOSED_ACK slirp_status=%d", + sckt->slirp_status); + return FALSE; + } + + tunnel_worker_free_socket(channel->worker, sckt); + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_receive_data(TunnelChannel *channel, RedSocket *sckt, + RedSocketRawRcvBuf *recv_data, int buf_size) +{ + if ((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED)) { + red_printf("unexcpected REDC_TUNNEL_SOCKET_DATA clinet_status=%d", + sckt->client_status); + return FALSE; + } + + // handling a case where the client sent data before it recieved the close msg + if ((sckt->slirp_status != SLIRP_SCKT_STATUS_OPEN) && + (sckt->slirp_status != SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data); + return (!CHECK_TUNNEL_ERROR(channel)); + } else if ((sckt->in_data.num_buffers == MAX_SOCKET_IN_BUFFERS) && + !channel->mig_inprogress && !channel->base.migrate) { + red_printf("socket in buffers overflow, socket will be closed" + " (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data); + tunnel_socket_force_close(channel, sckt); + return (!CHECK_TUNNEL_ERROR(channel)); + } + + tunnel_socket_assign_rcv_buf(sckt, recv_data, buf_size); + if (!sckt->in_data.client_total_num_tokens) { + red_printf("token vailoation"); + return FALSE; + } + + --sckt->in_data.client_total_num_tokens; + __process_rcv_buf_tokens(channel, sckt); + + if (sckt->in_data.ready_chunks_queue.head && !channel->mig_inprogress) { + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static inline int __client_socket_can_receive(RedSocket *sckt) +{ + return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) && + !sckt->worker->channel->mig_inprogress); +} + +static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket *sckt, + RedcTunnelSocketTokens *message) +{ + sckt->out_data.num_tokens += message->num_tokens; + + if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head && + !red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) { + // data is pending to be sent + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item); + } + + return TRUE; +} + +static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) { + return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf); + } else if ((msg_header->type == REDC_MIGRATE_DATA) || + (msg_header->type == REDC_TUNNEL_SERVICE_ADD)) { + uint8_t *ret = malloc(msg_header->size); + if (!ret) { + red_error("failed allocating"); + } + return ret; + } else { + return (tunnel_channel->control_rcv_buf); + } +} + +// called by the receive routine of the channel, before the buffer was assigned to a socket +static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header, + uint8_t *msg) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) { + ASSERT(!(CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque)); + __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker, + CONTAINEROF(msg, RedSocketRawRcvBuf, buf)); + } +} + +static void __tunnel_channel_fill_service_migrate_item(TunnelChannel *channel, + TunnelService *service, + TunnelMigrateServiceItem *migrate_item) +{ + migrate_item->service = service; + TunnelMigrateService *general_data; + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + general_data = &migrate_item->u.generic_service; + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + general_data = &migrate_item->u.print_service.base; + memcpy(migrate_item->u.print_service.ip, ((TunnelPrintService *)service)->ip, 4); + } else { + red_error("unexpected service type"); + } + + general_data->type = service->type; + general_data->id = service->id; + general_data->group = service->group; + general_data->port = service->port; + memcpy(general_data->virt_ip, &service->virt_ip.s_addr, 4); +} + +static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, RedSocket *sckt, + TunnelMigrateSocketItem *migrate_item) +{ + TunnelMigrateSocket *mig_sckt = &migrate_item->mig_socket; + migrate_item->socket = sckt; + mig_sckt->connection_id = sckt->connection_id; + mig_sckt->local_port = sckt->local_port; + mig_sckt->far_service_id = sckt->far_service->id; + mig_sckt->client_status = sckt->client_status; + mig_sckt->slirp_status = sckt->slirp_status; + + mig_sckt->pushed_close = sckt->pushed_close; + mig_sckt->client_waits_close_ack = sckt->client_waits_close_ack; + + mig_sckt->mig_client_status_msg = sckt->mig_client_status_msg; + mig_sckt->mig_open_ack_tokens = sckt->mig_open_ack_tokens; + + mig_sckt->out_data.num_tokens = sckt->out_data.num_tokens; + mig_sckt->out_data.window_size = sckt->out_data.window_size; + + // checking if there is a need to save the queues + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) { + mig_sckt->out_data.process_queue_size = + sckt->out_data.process_queue->get_migrate_data_proc(sckt->out_data.process_queue, + &migrate_item->out_process_queue); + } + + mig_sckt->in_data.num_tokens = sckt->in_data.num_tokens; + mig_sckt->in_data.client_total_num_tokens = sckt->in_data.client_total_num_tokens; + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + mig_sckt->in_data.process_queue_size = + sckt->in_data.process_queue->get_migrate_data_proc(sckt->in_data.process_queue, + &migrate_item->in_process_queue); + } + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + migrate_item->slirp_socket_size = net_slirp_tcp_socket_export(sckt->slirp_sckt, + &migrate_item->slirp_socket); + if (!migrate_item->slirp_socket) { + SET_TUNNEL_ERROR(channel, "failed export slirp socket"); + } + } else { + migrate_item->slirp_socket_size = 0; + migrate_item->slirp_socket = NULL; + } +} + +static void release_migrate_item(TunnelMigrateItem *item); +static int tunnel_channel_handle_migrate_mark(TunnelChannel *channel) +{ + TunnelMigrateItem *migrate_item = NULL; + TunnelService *service; + TunnelMigrateServiceItem *mig_service; + int num_sockets_saved = 0; + RedSocket *sckt; + + if (!channel->expect_migrate_mark) { + red_printf("unexpected"); + return FALSE; + } + channel->expect_migrate_mark = FALSE; + migrate_item = (TunnelMigrateItem *)malloc(sizeof(*migrate_item)); + + if (!migrate_item) { + red_error("failed alloc TunnelMigrateItem"); + } + + memset(migrate_item, 0, sizeof(*migrate_item)); + migrate_item->base.type = PIPE_ITEM_TYPE_MIGRATE_DATA; + + migrate_item->slirp_state_size = net_slirp_state_export(&migrate_item->slirp_state); + if (!migrate_item->slirp_state) { + red_printf("failed export slirp state"); + goto error; + } + + migrate_item->services_list_size = sizeof(TunnelMigrateServicesList) + + (sizeof(uint32_t)*channel->worker->num_services); + migrate_item->services_list = + (TunnelMigrateServicesList *)malloc(migrate_item->services_list_size); + if (!migrate_item->services_list) { + red_error("failed alloc services list"); + } + migrate_item->services_list->num_services = channel->worker->num_services; + + migrate_item->services = (TunnelMigrateServiceItem *)malloc( + channel->worker->num_services * sizeof(TunnelMigrateServiceItem)); + + if (!migrate_item->services) { + red_error("failed alloc services items"); + } + + for (mig_service = migrate_item->services, + service = (TunnelService *)ring_get_head(&channel->worker->services); + service; + mig_service++, + service = (TunnelService *)ring_next(&channel->worker->services, &service->ring_item)) { + __tunnel_channel_fill_service_migrate_item(channel, service, mig_service); + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + } + + migrate_item->sockets_list_size = sizeof(TunnelMigrateSocketList) + + (sizeof(uint32_t)*channel->worker->num_sockets); + migrate_item->sockets_list = (TunnelMigrateSocketList *)malloc( + migrate_item->sockets_list_size); + if (!migrate_item->sockets_list) { + red_error("failed alloc sockets list"); + } + + migrate_item->sockets_list->num_sockets = channel->worker->num_sockets; + + for (sckt = channel->worker->sockets; num_sockets_saved < channel->worker->num_sockets; + sckt++) { + if (sckt->allocated) { + __tunnel_channel_fill_socket_migrate_item(channel, sckt, + &migrate_item->sockets_data[ + num_sockets_saved++]); + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + } + } + + red_channel_pipe_add((RedChannel *)channel, &migrate_item->base); + + return TRUE; +error: + release_migrate_item(migrate_item); + return FALSE; +} + +static void release_migrate_item(TunnelMigrateItem *item) +{ + if (!item) { + return; + } + + int i; + if (item->sockets_list) { + int num_sockets = item->sockets_list->num_sockets; + for (i = 0; i < num_sockets; i++) { + if (item->sockets_data[i].socket) { // handling errors in the middle of + // __tunnel_channel_fill_socket_migrate_item + if (item->sockets_data[i].out_process_queue) { + item->sockets_data[i].socket->out_data.process_queue->release_migrate_data_proc( + item->sockets_data[i].socket->out_data.process_queue, + item->sockets_data[i].out_process_queue); + } + if (item->sockets_data[i].in_process_queue) { + item->sockets_data[i].socket->in_data.process_queue->release_migrate_data_proc( + item->sockets_data[i].socket->in_data.process_queue, + item->sockets_data[i].in_process_queue); + } + } + + free(item->sockets_data[i].slirp_socket); + } + free(item->sockets_list); + } + + free(item->services); + free(item->services_list); + free(item->slirp_state); + free(item); +} + +typedef RawTunneledBuffer *(*socket_alloc_buffer_proc_t)(RedSocket *sckt); + +typedef struct RedSocketRestoreTokensBuf { + RedSocketRawRcvBuf base; + int num_tokens; +} RedSocketRestoreTokensBuf; + +// not updating tokens +static void restored_rcv_buf_release(RawTunneledBuffer *buf) +{ + RedSocket *sckt = (RedSocket *)buf->usr_opaque; + --sckt->in_data.num_buffers; + __tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf); + // for case that ready queue is empty and the client has no tokens + __process_rcv_buf_tokens(sckt->worker->channel, sckt); +} + +RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt) +{ + RedSocketRawRcvBuf *buf = __tunnel_worker_alloc_socket_rcv_buf(sckt->worker); + buf->base.usr_opaque = sckt; + buf->base.release_proc = restored_rcv_buf_release; + + sckt->in_data.num_buffers++; + return &buf->base; +} + +static void restore_tokens_buf_release(RawTunneledBuffer *buf) +{ + RedSocketRestoreTokensBuf *tokens_buf = (RedSocketRestoreTokensBuf *)buf; + RedSocket *sckt = (RedSocket *)buf->usr_opaque; + + sckt->in_data.num_tokens += tokens_buf->num_tokens; + __process_rcv_buf_tokens(sckt->worker->channel, sckt); + + free(tokens_buf); +} + +RawTunneledBuffer *__tunnel_socket_alloc_restore_tokens_buf(RedSocket *sckt, int num_tokens) +{ + RedSocketRestoreTokensBuf *buf = (RedSocketRestoreTokensBuf *)malloc(sizeof(*buf)); + if (!buf) { + red_error("failed alloc"); + } + memset(buf, 0, sizeof(*buf)); + + buf->base.base.usr_opaque = sckt; + buf->base.base.refs = 1; + buf->base.base.release_proc = restore_tokens_buf_release; + buf->num_tokens = num_tokens; +#ifdef DEBUG_NETWORK + red_printf("TUNNEL DBG: num_tokens=%d", num_tokens); +#endif + return &buf->base.base; +} + +static void __restore_ready_chunks_queue(RedSocket *sckt, ReadyTunneledChunkQueue *queue, + uint8_t *data, int size, + socket_alloc_buffer_proc_t alloc_buf) +{ + int copied = 0; + + while (copied < size) { + RawTunneledBuffer *buf = alloc_buf(sckt); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + ready_queue_add_orig_chunk(queue, buf, buf->data, buf->size); + tunneled_buffer_unref(buf); + } +} + +// not using the alloc_buf cb of the queue, since we may want to create the migrated buffers +// with other properties (e.g., not releasing tokent) +static void __restore_process_queue(RedSocket *sckt, TunneledBufferProcessQueue *queue, + uint8_t *data, int size, + socket_alloc_buffer_proc_t alloc_buf) +{ + int copied = 0; + + while (copied < size) { + RawTunneledBuffer *buf = alloc_buf(sckt); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + __process_queue_push(queue, buf); + } +} + +static void tunnel_channel_restore_migrated_service(TunnelChannel *channel, + TunnelMigrateService *mig_service, + uint8_t *data_buf) +{ + int service_size; + TunnelService *service; + struct in_addr virt_ip; + if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + service_size = sizeof(TunnelService); + } else if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + service_size = sizeof(TunnelPrintService); + } else { + SET_TUNNEL_ERROR(channel, "unexpected service type"); + return; + } + + memcpy(&virt_ip.s_addr, mig_service->virt_ip, 4); + service = __tunnel_worker_add_service(channel->worker, service_size, + mig_service->type, mig_service->id, + mig_service->group, mig_service->port, + (char *)(data_buf + mig_service->name), + (char *)(data_buf + mig_service->description), &virt_ip); + if (!service) { + SET_TUNNEL_ERROR(channel, "failed creating service"); + return; + } + + if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + TunnelMigratePrintService *mig_print_service = (TunnelMigratePrintService *)mig_service; + TunnelPrintService *print_service = (TunnelPrintService *)service; + + memcpy(print_service->ip, mig_print_service->ip, 4); + } +} + +static void tunnel_channel_restore_migrated_socket(TunnelChannel *channel, + TunnelMigrateSocket *mig_socket, + uint8_t *data_buf) +{ + RedSocket *sckt; + SlirpSocket *slirp_sckt; + RawTunneledBuffer *tokens_buf; + TunnelService *service; + sckt = channel->worker->sockets + mig_socket->connection_id; + sckt->connection_id = mig_socket->connection_id; + ASSERT(!sckt->allocated); + + /* Services must be restored before sockets */ + service = tunnel_worker_find_service_by_id(channel->worker, mig_socket->far_service_id); + if (!service) { + SET_TUNNEL_ERROR(channel, "service not found"); + return; + } + + tunnel_worker_alloc_socket(channel->worker, sckt, mig_socket->local_port, service, NULL); + + sckt->client_status = mig_socket->client_status; + sckt->slirp_status = mig_socket->slirp_status; + + sckt->mig_client_status_msg = mig_socket->mig_client_status_msg; + sckt->mig_open_ack_tokens = mig_socket->mig_open_ack_tokens; + + sckt->pushed_close = mig_socket->pushed_close; + sckt->client_waits_close_ack = mig_socket->client_waits_close_ack; + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + slirp_sckt = net_slirp_tcp_socket_restore(data_buf + mig_socket->slirp_sckt, sckt); + if (!slirp_sckt) { + SET_TUNNEL_ERROR(channel, "failed restoring slirp socket"); + return; + } + sckt->slirp_sckt = slirp_sckt; + } + // out data + sckt->out_data.num_tokens = mig_socket->out_data.num_tokens; + sckt->out_data.window_size = mig_socket->out_data.window_size; + sckt->out_data.data_size = mig_socket->out_data.process_buf_size + + mig_socket->out_data.ready_buf_size; + + __restore_ready_chunks_queue(sckt, &sckt->out_data.ready_chunks_queue, + data_buf + mig_socket->out_data.ready_buf, + mig_socket->out_data.ready_buf_size, + tunnel_socket_alloc_snd_buf); + + sckt->out_data.process_queue->restore_proc(sckt->out_data.process_queue, + data_buf + mig_socket->out_data.process_queue); + + __restore_process_queue(sckt, sckt->out_data.process_queue, + data_buf + mig_socket->out_data.process_buf, + mig_socket->out_data.process_buf_size, + tunnel_socket_alloc_snd_buf); + + sckt->in_data.client_total_num_tokens = mig_socket->in_data.client_total_num_tokens; + sckt->in_data.num_tokens = mig_socket->in_data.num_tokens; + + __restore_ready_chunks_queue(sckt, &sckt->in_data.ready_chunks_queue, + data_buf + mig_socket->in_data.ready_buf, + mig_socket->in_data.ready_buf_size, + tunnel_socket_alloc_restored_rcv_buf); + + sckt->in_data.process_queue->restore_proc(sckt->in_data.process_queue, + data_buf + mig_socket->in_data.process_queue); + + __restore_process_queue(sckt, sckt->in_data.process_queue, + data_buf + mig_socket->in_data.process_buf, + mig_socket->in_data.process_buf_size, + tunnel_socket_alloc_restored_rcv_buf); + + tokens_buf = __tunnel_socket_alloc_restore_tokens_buf(sckt, + SOCKET_WINDOW_SIZE - + (sckt->in_data.client_total_num_tokens + + sckt->in_data.num_tokens)); + if (sckt->in_data.process_queue->head) { + __process_queue_push(sckt->in_data.process_queue, tokens_buf); + } else { + ready_queue_add_orig_chunk(&sckt->in_data.ready_chunks_queue, tokens_buf, + tokens_buf->data, tokens_buf->size); + tunneled_buffer_unref(tokens_buf); + } +} + +static void tunnel_channel_restore_socket_state(TunnelChannel *channel, RedSocket *sckt) +{ + int ret = TRUE; + red_printf(""); + // handling client status msgs that were received during migration + switch (sckt->mig_client_status_msg) { + case 0: + break; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + break; + case REDC_TUNNEL_SOCKET_OPEN_NACK: + ret = tunnel_channel_handle_socket_connect_nack(channel, sckt); + break; + case REDC_TUNNEL_SOCKET_FIN: + if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) { + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + } + if (ret) { + ret = tunnel_channel_handle_socket_fin(channel, sckt); + } + break; + case REDC_TUNNEL_SOCKET_CLOSED: + // can't just send nack since we need to send close ack to client + if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) { + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + } + ret = ret & tunnel_channel_handle_socket_closed(channel, sckt); + + break; + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + ret = tunnel_channel_handle_socket_closed_ack(channel, sckt); + break; + default: + SET_TUNNEL_ERROR(channel, "invalid message type %u", sckt->mig_client_status_msg); + return; + } + + if (!ret) { + SET_TUNNEL_ERROR(channel, "failed restoring socket state"); + return; + } + sckt->mig_client_status_msg = 0; + sckt->mig_open_ack_tokens = 0; + + // handling data transfer + if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head) { + if (!red_channel_pipe_item_is_linked( + &channel->base, &sckt->out_data.data_pipe_item)) { + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item); + } + } + + if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) && + sckt->in_data.ready_chunks_queue.head) { + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) { + net_slirp_socket_can_send_notify(sckt->slirp_sckt); + } + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + // for cases where the client has no tokens left, but all the data is in the process queue. + __process_rcv_buf_tokens(channel, sckt); +} + +static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel) +{ + // if we are overgoing migration again, no need to restore the state, we will wait + // for the next host. + if (!channel->mig_inprogress) { + int num_activated = 0; + RedSocket *sckt = channel->worker->sockets; + + for (; num_activated < channel->worker->num_sockets; sckt++) { + if (sckt->allocated) { + tunnel_channel_restore_socket_state(channel, sckt); + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + + num_activated++; + } + } + net_slirp_unfreeze(); + } +} + +static int tunnel_channel_handle_migrate_data(TunnelChannel *channel, + TunnelMigrateData *migrate_data) +{ + TunnelMigrateSocketList *sockets_list; + TunnelMigrateServicesList *services_list; + int i; + + if (!channel->expect_migrate_data) { + red_printf("unexpected"); + goto error; + } + channel->expect_migrate_data = FALSE; + + if (migrate_data->magic != TUNNEL_MIGRATE_DATA_MAGIC || + migrate_data->version != TUNNEL_MIGRATE_DATA_VERSION) { + red_printf("invalid content"); + goto error; + } + + ASSERT(channel->base.send_data.header.serial == 0); + channel->base.send_data.header.serial = migrate_data->message_serial; + + net_slirp_state_restore(migrate_data->data + migrate_data->slirp_state); + + services_list = (TunnelMigrateServicesList *)(migrate_data->data + + migrate_data->services_list); + for (i = 0; i < services_list->num_services; i++) { + tunnel_channel_restore_migrated_service(channel, + (TunnelMigrateService *)(migrate_data->data + + services_list->services[i]), + migrate_data->data); + if (CHECK_TUNNEL_ERROR(channel)) { + red_printf("failed restoring service"); + goto error; + } + } + + sockets_list = (TunnelMigrateSocketList *)(migrate_data->data + migrate_data->sockets_list); + + for (i = 0; i < sockets_list->num_sockets; i++) { + tunnel_channel_restore_migrated_socket(channel, + (TunnelMigrateSocket *)(migrate_data->data + + sockets_list->sockets[i]), + migrate_data->data); + if (CHECK_TUNNEL_ERROR(channel)) { + red_printf("failed restoring socket"); + goto error; + } + } + + // activate channel + channel->base.migrate = FALSE; + red_channel_init_outgoing_messages_window(&channel->base); + + tunnel_channel_activate_migrated_sockets(channel); + + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + free(migrate_data); + return TRUE; +error: + free(migrate_data); + return FALSE; +} + +// msg was allocated by tunnel_channel_alloc_msg_rcv_buf +static int tunnel_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + RedSocket *sckt = NULL; + // retrieve the sckt + switch (header->type) { + case REDC_MIGRATE_FLUSH_MARK: + case REDC_MIGRATE_DATA: + case REDC_TUNNEL_SERVICE_ADD: + case REDC_TUNNEL_SERVICE_REMOVE: + break; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + case REDC_TUNNEL_SOCKET_OPEN_NACK: + case REDC_TUNNEL_SOCKET_DATA: + case REDC_TUNNEL_SOCKET_FIN: + case REDC_TUNNEL_SOCKET_CLOSED: + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + case REDC_TUNNEL_SOCKET_TOKEN: + // the first field in these messages is connection id + sckt = tunnel_channel->worker->sockets + (*((uint16_t *)msg)); + if (!sckt->allocated) { + red_printf("red socket not found"); + return FALSE; + } + break; + default: + return red_channel_handle_message(channel, header, msg); + } + + switch (header->type) { + case REDC_TUNNEL_SERVICE_ADD: + if (header->size < sizeof(RedcTunnelAddGenericService)) { + red_printf("bad message size"); + free(msg); + return FALSE; + } + return tunnel_channel_handle_service_add(tunnel_channel, + (RedcTunnelAddGenericService *)msg); + case REDC_TUNNEL_SERVICE_REMOVE: + red_printf("REDC_TUNNEL_REMOVE_SERVICE not supported yet"); + return FALSE; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + if (header->size != sizeof(RedcTunnelSocketOpenAck)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_connect_ack(tunnel_channel, sckt, + ((RedcTunnelSocketOpenAck *)msg)->tokens); + + case REDC_TUNNEL_SOCKET_OPEN_NACK: + if (header->size != sizeof(RedcTunnelSocketOpenNack)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_connect_nack(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_DATA: + { + if (header->size < sizeof(RedcTunnelSocketData)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_receive_data(tunnel_channel, sckt, + CONTAINEROF(msg, RedSocketRawRcvBuf, buf), + header->size - sizeof(RedcTunnelSocketData)); + } + case REDC_TUNNEL_SOCKET_FIN: + if (header->size != sizeof(RedcTunnelSocketFin)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_fin(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_CLOSED: + if (header->size != sizeof(RedcTunnelSocketClosed)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_closed(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + if (header->size != sizeof(RedcTunnelSocketClosedAck)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_closed_ack(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_TOKEN: + if (header->size != sizeof(RedcTunnelSocketTokens)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_token(tunnel_channel, sckt, + (RedcTunnelSocketTokens *)msg); + case REDC_MIGRATE_FLUSH_MARK: + return tunnel_channel_handle_migrate_mark(tunnel_channel); + case REDC_MIGRATE_DATA: + if (header->size < sizeof(TunnelMigrateData)) { + red_printf("bad message size"); + free(msg); + return FALSE; + } + return tunnel_channel_handle_migrate_data(tunnel_channel, (TunnelMigrateData *)msg); + default: + return red_channel_handle_message(channel, header, msg); + } + return TRUE; +} + +/********************************/ +/* outgoing msgs +********************************/ + +static void tunnel_channel_send_set_ack(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->base.send_data.u.ack.generation = ++channel->base.ack_data.generation; + channel->base.send_data.u.ack.window = CLIENT_ACK_WINDOW; + + red_channel_init_send_data(&channel->base, RED_SET_ACK, item); + red_channel_add_buf(&channel->base, &channel->base.send_data.u.ack, sizeof(RedSetAck)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_migrate(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + channel->base.send_data.u.migrate.flags = RED_MIGRATE_NEED_FLUSH | + RED_MIGRATE_NEED_DATA_TRANSFER; + channel->expect_migrate_mark = TRUE; + red_channel_init_send_data(&channel->base, RED_MIGRATE, item); + red_channel_add_buf(&channel->base, &channel->base.send_data.u.migrate, sizeof(RedMigrate)); + red_channel_begin_send_massage(&channel->base); +} + +static int __tunnel_channel_send_process_bufs_migrate_data(TunnelChannel *channel, + TunneledBufferProcessQueue *queue) +{ + int buf_offset = queue->head_offset; + RawTunneledBuffer *buf = queue->head; + int size = 0; + + while (buf) { + red_channel_add_buf(&channel->base, buf->data + buf_offset, buf->size - buf_offset); + size += buf->size - buf_offset; + buf_offset = 0; + buf = buf->next; + } + + return size; +} + +static int __tunnel_channel_send_ready_bufs_migrate_data(TunnelChannel *channel, + ReadyTunneledChunkQueue *queue) +{ + int offset = queue->offset; + ReadyTunneledChunk *chunk = queue->head; + int size = 0; + + while (chunk) { + red_channel_add_buf(&channel->base, chunk->data + offset, chunk->size - offset); + size += chunk->size - offset; + offset = 0; + chunk = chunk->next; + } + return size; +} + +// returns the size to send +static int __tunnel_channel_send_service_migrate_data(TunnelChannel *channel, + TunnelMigrateServiceItem *item, + int offset) +{ + TunnelService *service = item->service; + int cur_offset = offset; + TunnelMigrateService *generic_data; + + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + generic_data = &item->u.generic_service; + red_channel_add_buf(&channel->base, &item->u.generic_service, + sizeof(item->u.generic_service)); + cur_offset += sizeof(item->u.generic_service); + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + generic_data = &item->u.print_service.base; + red_channel_add_buf(&channel->base, &item->u.print_service, + sizeof(item->u.print_service)); + cur_offset += sizeof(item->u.print_service); + } else { + red_error("unexpected service type"); + } + + generic_data->name = cur_offset; + red_channel_add_buf(&channel->base, service->name, strlen(service->name) + 1); + cur_offset += strlen(service->name) + 1; + + generic_data->description = cur_offset; + red_channel_add_buf(&channel->base, service->description, strlen(service->description) + 1); + cur_offset += strlen(service->description) + 1; + + return (cur_offset - offset); +} + +// returns the size to send +static int __tunnel_channel_send_socket_migrate_data(TunnelChannel *channel, + TunnelMigrateSocketItem *item, int offset) +{ + RedSocket *sckt = item->socket; + TunnelMigrateSocket *mig_sckt = &item->mig_socket; + int cur_offset = offset; + red_channel_add_buf(&channel->base, mig_sckt, sizeof(*mig_sckt)); + cur_offset += sizeof(*mig_sckt); + + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) { + mig_sckt->out_data.process_buf = cur_offset; + mig_sckt->out_data.process_buf_size = + __tunnel_channel_send_process_bufs_migrate_data(channel, + sckt->out_data.process_queue); + cur_offset += mig_sckt->out_data.process_buf_size; + if (mig_sckt->out_data.process_queue_size) { + mig_sckt->out_data.process_queue = cur_offset; + red_channel_add_buf(&channel->base, item->out_process_queue, + mig_sckt->out_data.process_queue_size); + cur_offset += mig_sckt->out_data.process_queue_size; + } + mig_sckt->out_data.ready_buf = cur_offset; + mig_sckt->out_data.ready_buf_size = + __tunnel_channel_send_ready_bufs_migrate_data(channel, + &sckt->out_data.ready_chunks_queue); + cur_offset += mig_sckt->out_data.ready_buf_size; + } else { + mig_sckt->out_data.process_buf_size = 0; + mig_sckt->out_data.ready_buf_size = 0; + } + + // notice that we migrate the received buffers without the msg headers. + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + mig_sckt->in_data.process_buf = cur_offset; + mig_sckt->in_data.process_buf_size = + __tunnel_channel_send_process_bufs_migrate_data(channel, + sckt->in_data.process_queue); + cur_offset += mig_sckt->in_data.process_buf_size; + if (mig_sckt->in_data.process_queue_size) { + mig_sckt->in_data.process_queue = cur_offset; + red_channel_add_buf(&channel->base, item->in_process_queue, + mig_sckt->in_data.process_queue_size); + cur_offset += mig_sckt->in_data.process_queue_size; + } + mig_sckt->in_data.ready_buf = cur_offset; + mig_sckt->in_data.ready_buf_size = + __tunnel_channel_send_ready_bufs_migrate_data(channel, + &sckt->in_data.ready_chunks_queue); + cur_offset += mig_sckt->in_data.ready_buf_size; + } else { + mig_sckt->in_data.process_buf_size = 0; + mig_sckt->in_data.ready_buf_size = 0; + } + + if (item->slirp_socket_size) { // zero if socket is closed + red_channel_add_buf(&channel->base, item->slirp_socket, item->slirp_socket_size); + mig_sckt->slirp_sckt = cur_offset; + cur_offset += item->slirp_socket_size; + } + return (cur_offset - offset); +} + +static void tunnel_channel_send_migrate_data(TunnelChannel *channel, PipeItem *item) +{ + TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data; + TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item; + int i; + + uint32_t data_buf_offset = 0; // current location in data[0] field + ASSERT(channel); + + migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC; + migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION; + migrate_data->message_serial = red_channel_get_message_serial(&channel->base); + red_channel_init_send_data(&channel->base, RED_MIGRATE_DATA, item); + red_channel_add_buf(&channel->base, migrate_data, sizeof(*migrate_data)); + + migrate_data->slirp_state = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->slirp_state, migrate_item->slirp_state_size); + data_buf_offset += migrate_item->slirp_state_size; + + migrate_data->services_list = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->services_list, + migrate_item->services_list_size); + data_buf_offset += migrate_item->services_list_size; + + for (i = 0; i < migrate_item->services_list->num_services; i++) { + migrate_item->services_list->services[i] = data_buf_offset; + data_buf_offset += __tunnel_channel_send_service_migrate_data(channel, + migrate_item->services + i, + data_buf_offset); + } + + + migrate_data->sockets_list = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->sockets_list, + migrate_item->sockets_list_size); + data_buf_offset += migrate_item->sockets_list_size; + + for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) { + migrate_item->sockets_list->sockets[i] = data_buf_offset; + data_buf_offset += __tunnel_channel_send_socket_migrate_data(channel, + migrate_item->sockets_data + i, + data_buf_offset); + } + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_init(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE; + channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_INIT, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.init, sizeof(RedTunnelInit)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_service_ip_map(TunnelChannel *channel, PipeItem *item) +{ + TunnelService *service = CONTAINEROF(item, TunnelService, pipe_item); + + channel->send_data.u.service_ip.service_id = service->id; + channel->send_data.u.service_ip.virtual_ip.type = RED_TUNNEL_IP_TYPE_IPv4; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SERVICE_IP_MAP, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.service_ip, + sizeof(RedTunnelServiceIpMap)); + red_channel_add_buf(&channel->base, &service->virt_ip.s_addr, sizeof(RedTunnelIPv4)); + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_socket_open(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + channel->send_data.u.socket_open.connection_id = sckt->connection_id; + channel->send_data.u.socket_open.service_id = sckt->far_service->id; + channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE; + + sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE; + sckt->in_data.num_tokens = 0; + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_OPEN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_open, + sizeof(channel->send_data.u.socket_open)); + + red_channel_begin_send_massage(&channel->base); +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_fin(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + ASSERT(!sckt->out_data.ready_chunks_queue.head); + + if (sckt->out_data.process_queue->head) { + red_printf("socket sent FIN but there are still buffers in outgoing process queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + + channel->send_data.u.socket_fin.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_FIN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_fin, + sizeof(channel->send_data.u.socket_fin)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_close(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + // can happen when it is a forced close + if (sckt->out_data.ready_chunks_queue.head) { + red_printf("socket closed but there are still buffers in outgoing ready queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), + sckt->far_service->id); + } + + if (sckt->out_data.process_queue->head) { + red_printf("socket closed but there are still buffers in outgoing process queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + + channel->send_data.u.socket_close.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSE, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close, + sizeof(channel->send_data.u.socket_close)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_closed_ack(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id; + + // pipe item is null because we free the sckt. + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSED_ACK, NULL); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close_ack, + sizeof(channel->send_data.u.socket_close_ack)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED)); + tunnel_worker_free_socket(channel->worker, sckt); + if (CHECK_TUNNEL_ERROR(channel)) { + tunnel_shutdown(channel->worker); + } +} + +static void tunnel_channel_send_socket_token(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, token_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since + the sending is performed after the pipe item was pushed */ + + channel->send_data.u.socket_token.connection_id = sckt->connection_id; + + if (sckt->in_data.num_tokens > 0) { + channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens; + } else { + ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head); + channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS; + } + sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens; + sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens; + ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE); + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_TOKEN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_token, + sizeof(channel->send_data.u.socket_token)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_socket_out_data(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + ReadyTunneledChunk *chunk; + uint32_t total_push_size = 0; + uint32_t pushed_bufs_num = 0; + + ASSERT(!sckt->pushed_close); + if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + return; + } + + if (!sckt->out_data.num_tokens) { + return; // only when an we will recieve tokens, data will be sent again. + } + + ASSERT(sckt->out_data.ready_chunks_queue.head); + ASSERT(!sckt->out_data.push_tail); + ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE); + + channel->send_data.u.socket_data.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_DATA, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_data, + sizeof(channel->send_data.u.socket_data)); + pushed_bufs_num++; + + // the first chunk is in a valid size + chunk = sckt->out_data.ready_chunks_queue.head; + total_push_size = chunk->size - sckt->out_data.ready_chunks_queue.offset; + red_channel_add_buf(&channel->base, chunk->data + sckt->out_data.ready_chunks_queue.offset, + total_push_size); + pushed_bufs_num++; + sckt->out_data.push_tail = chunk; + sckt->out_data.push_tail_size = chunk->size; // all the chunk was sent + + chunk = chunk->next; + + while (chunk && (total_push_size < MAX_SOCKET_DATA_SIZE) && (pushed_bufs_num < MAX_SEND_BUFS)) { + uint32_t cur_push_size = MIN(chunk->size, MAX_SOCKET_DATA_SIZE - total_push_size); + red_channel_add_buf(&channel->base, chunk->data, cur_push_size); + pushed_bufs_num++; + + sckt->out_data.push_tail = chunk; + sckt->out_data.push_tail_size = cur_push_size; + total_push_size += cur_push_size; + + chunk = chunk->next; + } + + sckt->out_data.num_tokens--; + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + ASSERT(sckt_out_data->ready_chunks_queue.head); + + while (sckt_out_data->ready_chunks_queue.head != sckt_out_data->push_tail) { + sckt_out_data->data_size -= sckt_out_data->ready_chunks_queue.head->size; + ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue); + } + + sckt_out_data->data_size -= sckt_out_data->push_tail_size; + + // compansation. was substructed in the previous lines + sckt_out_data->data_size += sckt_out_data->ready_chunks_queue.offset; + + if (sckt_out_data->push_tail_size == sckt_out_data->push_tail->size) { + ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue); + sckt_out_data->ready_chunks_queue.offset = 0; + } else { + sckt_out_data->ready_chunks_queue.offset = sckt_out_data->push_tail_size; + } + + sckt_out_data->push_tail = NULL; + sckt_out_data->push_tail_size = 0; + + if (worker->channel) { + // can still send data to socket + if (__client_socket_can_receive(sckt)) { + if (sckt_out_data->ready_chunks_queue.head) { + // the pipe item may alreay be linked, if for example the send was + // blocked and before it finshed and called release, tunnel_socket_send was called + if (!red_channel_pipe_item_is_linked( + &worker->channel->base, &sckt_out_data->data_pipe_item)) { + sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item); + } + } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + } + } + } + + + if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) && + !sckt->in_slirp_send && !worker->channel->mig_inprogress) { + // for cases that slirp couldn't write whole it data to our socket buffer + net_slirp_socket_can_send_notify(sckt->slirp_sckt); + } +} + +static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + + red_channel_reset_send_data(channel); + switch (item->type) { + case PIPE_ITEM_TYPE_SET_ACK: + tunnel_channel_send_set_ack(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_TUNNEL_INIT: + tunnel_channel_send_init(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SERVICE_IP_MAP: + tunnel_channel_send_service_ip_map(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_OPEN: + tunnel_channel_send_socket_open(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_DATA: + tunnel_channel_send_socket_out_data(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_FIN: + tunnel_channel_send_socket_fin(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_CLOSE: + tunnel_channel_send_socket_close(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK: + tunnel_channel_send_socket_closed_ack(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_TOKEN: + tunnel_channel_send_socket_token(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_MIGRATE: + tunnel_channel_send_migrate(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_MIGRATE_DATA: + tunnel_channel_send_migrate_data(tunnel_channel, item); + break; + default: + red_error("invalid pipe item type"); + } +} + +/* param item_pushed: distinguishes between a pipe item that was pushed for sending, and + a pipe item that is still in the pipe and is released due to disconnection. + see red_pipe_item_clear */ +static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed) +{ + if (!item) { // e.g. when acking closed socket + return; + } + switch (item->type) { + case PIPE_ITEM_TYPE_SET_ACK: + case PIPE_ITEM_TYPE_TUNNEL_INIT: + free(item); + break; + case PIPE_ITEM_TYPE_SERVICE_IP_MAP: + case PIPE_ITEM_TYPE_SOCKET_OPEN: + case PIPE_ITEM_TYPE_SOCKET_CLOSE: + case PIPE_ITEM_TYPE_SOCKET_FIN: + case PIPE_ITEM_TYPE_SOCKET_TOKEN: + break; + case PIPE_ITEM_TYPE_SOCKET_DATA: + if (item_pushed) { + tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item); + } + break; + case PIPE_ITEM_TYPE_MIGRATE: + free(item); + break; + case PIPE_ITEM_TYPE_MIGRATE_DATA: + release_migrate_item((TunnelMigrateItem *)item); + break; + default: + red_error("invalid pipe item type"); + } +} + +/*********************************************************** +* interface for slirp +************************************************************/ + +static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface) +{ + TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + return worker->vlan_interface->can_send_packet(worker->vlan_interface); +} + +static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len) +{ + TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + worker->vlan_interface->send_packet(worker->vlan_interface, pkt, pkt_len); +} + +static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s) +{ + errno = ENETUNREACH; + return -1; +} + +static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s) +{ + TunnelWorker *worker; + RedSocket *sckt; + TunnelService *far_service; + + ASSERT(usr_interface); + +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG"); +#endif + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + ASSERT(worker->channel); + ASSERT(!worker->channel->mig_inprogress); + + far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port)); + + if (!far_service) { + errno = EADDRNOTAVAIL; + return -1; + } + + if (tunnel_worker_find_socket(worker, src_port, far_service->id)) { + red_printf("slirp tried to open a socket that is still opened"); + errno = EADDRINUSE; + return -1; + } + + if (worker->num_sockets == MAX_SOCKETS_NUM) { + red_printf("number of tunneled sockets exceeds the limit"); + errno = ENFILE; + return -1; + } + + sckt = tunnel_worker_create_socket(worker, src_port, far_service, slirp_s); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + *o_usr_s = sckt; + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN; + red_channel_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item); + + errno = EINPROGRESS; + return -1; +} + +static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent) +{ + errno = ECONNRESET; + return -1; +} + +static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent) +{ + TunnelWorker *worker; + RedSocket *sckt; + size_t size_to_send; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + ASSERT(!worker->channel->mig_inprogress); + + sckt = (RedSocket *)opaque; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + errno = EAGAIN; + return -1; + } + + if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) && + (sckt->client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + red_printf("client socket is unable to receive data"); + errno = ECONNRESET; + return -1; + } + + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + red_printf("send was shutdown"); + errno = EPIPE; + return -1; + } + + if (urgent) { + SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + sckt->in_slirp_send = TRUE; + + if (sckt->out_data.data_size < (sckt->out_data.window_size) * MAX_SOCKET_DATA_SIZE) { + // the current data in the queues doesn't fill all the tokens + size_to_send = len; + } else { + if (sckt->out_data.ready_chunks_queue.head) { + // there are no tokens for future data, but once the data will be sent + // and buffers will be released, we will try to send again. + size_to_send = 0; + } else { + ASSERT(sckt->out_data.process_queue->head); + if ((sckt->out_data.data_size + len) > + (MAX_SOCKET_OUT_BUFFERS * MAX_SOCKET_DATA_SIZE)) { + red_printf("socket out buffers overflow, socket will be closed" + " (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + tunnel_socket_force_close(worker->channel, sckt); + size_to_send = 0; + } else { + size_to_send = len; + } + } + } + + if (size_to_send) { + process_queue_append(sckt->out_data.process_queue, buf, size_to_send); + sckt->out_data.data_size += size_to_send; + + if (sckt->out_data.ready_chunks_queue.head && + !red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.data_pipe_item)) { + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item); + } + } + + sckt->in_slirp_send = FALSE; + + if (!size_to_send) { + errno = EAGAIN; + return -1; + } else { + return size_to_send; + } +} + +static inline int __should_send_fin_to_guest(RedSocket *sckt) +{ + return (((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) && + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND))) && + !sckt->in_data.ready_chunks_queue.head); +} + +static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len) +{ + errno = ECONNRESET; + return -1; +} + +static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len) +{ + TunnelWorker *worker; + RedSocket *sckt; + int copied = 0; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + ASSERT(!worker->channel->mig_inprogress); + + sckt = (RedSocket *)opaque; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + errno = EAGAIN; + return -1; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + SET_TUNNEL_ERROR(worker->channel, "recieve was shutdown"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + ASSERT((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) && + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND))); + + + // if there is data in ready queue, when it is acked, slirp will call recv and get 0 + if (__should_send_fin_to_guest(sckt)) { + if (sckt->in_data.process_queue->head) { + red_printf("client socket sent FIN but there are still buffers in incoming process" + "queue (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + return 0; // slirp will call shutdown recv now and it will also send FIN to the guest. + } + + while (sckt->in_data.ready_chunks_queue.head && (copied < len)) { + ReadyTunneledChunk *cur_chunk = sckt->in_data.ready_chunks_queue.head; + int copy_count = MIN(cur_chunk->size - sckt->in_data.ready_chunks_queue.offset, + len - copied); + + memcpy(buf + copied, cur_chunk->data + sckt->in_data.ready_chunks_queue.offset, copy_count); + copied += copy_count; + if ((sckt->in_data.ready_chunks_queue.offset + copy_count) == cur_chunk->size) { + ready_queue_pop_chunk(&sckt->in_data.ready_chunks_queue); + sckt->in_data.ready_chunks_queue.offset = 0; + } else { + ASSERT(copied == len); + sckt->in_data.ready_chunks_queue.offset += copy_count; + } + } + + if (!copied) { + errno = EAGAIN; + return -1; + } else { + return copied; + } +} + +static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque) +{ +} + +// can be called : 1) when a FIN is reqested from the guset 2) after shutdown rcv that was called +// after recieved failed because the client socket was sent FIN +static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + ASSERT(!worker->channel->mig_inprogress); + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + return; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_SEND; + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) { + ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND); + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + } else { + SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d", + sckt->slirp_status); + tunnel_shutdown(worker); + return; + } + + if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // check if there is still data to send. the fin will be sent after data is released + // channel is alive, otherwise the sockets would have been aborted + if (!sckt->out_data.ready_chunks_queue.head) { + __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + } + } else { // if client is closed, it means the connection was aborted since we didn't + // received fin from guest + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_send client_status=%d", + sckt->client_status); + tunnel_shutdown(worker); + } +} + +static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque) +{ +} + +static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + ASSERT(!worker->channel->mig_inprogress); + + /* failure in recv can happen after the client sckt was shutdown + (after client sent FIN, or after slirp sent FIN and client socket was closed */ + if (!__should_send_fin_to_guest(sckt)) { + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d", + sckt->client_status, sckt->slirp_status); + tunnel_shutdown(worker); + return; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_RECV; + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + } else { + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_recv slirp_status=%d", + sckt->slirp_status); + tunnel_shutdown(worker); + } +} + +static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + sckt = (RedSocket *)opaque; +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED; + + if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + tunnel_worker_free_socket(worker, sckt); + } // else, it will be closed when disconnect will be called (because this callback is + // set if the channel is disconnect or when we are in the middle of disconnection that + // was caused by an error +} + +// can be called during migration due to the channel diconnect. But it does not affect the +// migrate data +static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED; + + // if sckt is not opened yet, close will be sent when we recieve connect ack + if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // check if there is still data to send. the close will be sent after data is released. + // close may already been pushed if it is a forced close + if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) { + __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + } + } else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + if (sckt->client_waits_close_ack) { + __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt); + } else { + tunnel_worker_free_socket(worker, sckt); + } + } +} + +static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface, + timer_proc_t proc, void *opaque) +{ + TunnelWorker *worker; + + ASSERT(usr_interface); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + return (void *)worker->core_interface->create_timer(worker->core_interface, + proc, opaque); +} + +static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms) +{ + TunnelWorker *worker; + + ASSERT(usr_interface); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; +#ifdef DEBUG_NETWORK + if (!worker->channel) { + red_printf("channel not connected"); + } +#endif + if (worker->channel && worker->channel->mig_inprogress) { + SET_TUNNEL_ERROR(worker->channel, "during migration"); + tunnel_shutdown(worker); + return; + } + + worker->core_interface->arm_timer(worker->core_interface, (VDObjectRef)timer, ms); +} + +/*********************************************** +* channel interface and other related procedures +************************************************/ + +static int tunnel_channel_config_socket(RedChannel *channel) +{ + int flags; + int delay_val; + + if ((flags = fcntl(channel->peer->socket, F_GETFL)) == -1) { + red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error? + return FALSE; + } + + if (fcntl(channel->peer->socket, F_SETFL, flags | O_NONBLOCK) == -1) { + red_printf("accept failed, %s", strerror(errno)); + return FALSE; + } + + delay_val = 1; + + if (setsockopt(channel->peer->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + red_printf("setsockopt failed, %s", strerror(errno)); + } + + return TRUE; +} + +static void tunnel_worker_disconnect_slirp(TunnelWorker *worker) +{ + int i; + + net_slirp_set_net_interface(&worker->null_interface.base); + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + RedSocket *sckt = worker->sockets + i; + if (sckt->allocated) { + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + sckt->client_waits_close_ack = FALSE; + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + tunnel_worker_free_socket(worker, sckt); + } else { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } + } + } +} + +/* don't call disconnect from functions that might be called by slirp + since it closes all its sockets and slirp is not aware of it */ +static void tunnel_channel_disconnect(RedChannel *channel) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + TunnelWorker *worker; + if (!channel) { + return; + } + red_printf(""); + worker = tunnel_channel->worker; + + tunnel_worker_disconnect_slirp(worker); + + tunnel_worker_clear_routed_network(worker); + red_channel_destroy(channel); + worker->channel = NULL; +} + +/* interface for reds */ + +static void on_new_tunnel_channel(TunnelChannel *channel) +{ + red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_SET_ACK); + + if (channel->base.migrate) { + channel->expect_migrate_data = TRUE; + } else { + red_channel_init_outgoing_messages_window(&channel->base); + red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_TUNNEL_INIT); + } +} + +static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps) +{ + TunnelChannel *tunnel_channel; + TunnelWorker *worker = (TunnelWorker *)channel->data; + if (worker->channel) { + tunnel_channel_disconnect(&worker->channel->base); + } + + tunnel_channel = + (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), peer, worker->core_interface, + migration, TRUE, + tunnel_channel_config_socket, + tunnel_channel_disconnect, + tunnel_channel_handle_message, + tunnel_channel_alloc_msg_rcv_buf, + tunnel_channel_release_msg_rcv_buf, + tunnel_channel_send_item, + tunnel_channel_release_pipe_item); + + if (!tunnel_channel) { + return; + } + + + tunnel_channel->worker = worker; + tunnel_channel->worker->channel = tunnel_channel; + net_slirp_set_net_interface(&worker->tunnel_interface.base); + + on_new_tunnel_channel(tunnel_channel); +} + +static void handle_tunnel_channel_shutdown(struct Channel *channel) +{ + tunnel_channel_disconnect(&((TunnelWorker *)channel->data)->channel->base); +} + +static void handle_tunnel_channel_migrate(struct Channel *channel) +{ +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: MIGRATE STARTED"); +#endif + TunnelChannel *tunnel_channel = ((TunnelWorker *)channel->data)->channel; + tunnel_channel->mig_inprogress = TRUE; + net_slirp_freeze(); + red_channel_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE); +} + diff --git a/server/red_tunnel_worker.h b/server/red_tunnel_worker.h new file mode 100755 index 0000000..5d00a34 --- /dev/null +++ b/server/red_tunnel_worker.h @@ -0,0 +1,29 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_RED_TUNNEL_WORKER +#define _H_RED_TUNNEL_WORKER + +#include "vd_interface.h" + +void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface); + +#endif diff --git a/server/reds.c b/server/reds.c index 067304d..4630e95 100644 --- a/server/reds.c +++ b/server/reds.c @@ -47,6 +47,7 @@ #include "red_common.h" #include "red_dispatcher.h" #include "snd_worker.h" +#include "red_tunnel_worker.h" #include "reds_stat.h" #include "stat.h" #include "ring.h" @@ -83,6 +84,7 @@ static pthread_mutex_t *lock_cs; static long *lock_count; uint32_t streaming_video = TRUE; image_compression_t image_compression = IMAGE_COMPRESS_AUTO_GLZ; +void *red_tunnel = NULL; int agent_mouse = TRUE; static void openssl_init(); @@ -3921,7 +3923,7 @@ int __attribute__ ((visibility ("default"))) spice_parse_args(const char *in_arg // All SSL parameters should be either on or off. if (ssl_port != ssl_key || ssl_key != ssl_certs || ssl_certs != ssl_cafile || - ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) { + ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) { goto error; } @@ -4775,6 +4777,19 @@ static void interface_change_notifier(void *opaque, VDInterface *interface, return; } attach_to_red_agent((VDIPortInterface *)interface); + } else if (strcmp(interface->type, VD_INTERFACE_NET_WIRE) == 0) { + NetWireInterface * net_wire = (NetWireInterface *)interface; + red_printf("VD_INTERFACE_NET_WIRE"); + if (red_tunnel) { + red_printf("net wire already attached"); + return; + } + if (interface->major_version != VD_INTERFACE_NET_WIRE_MAJOR || + interface->minor_version < VD_INTERFACE_NET_WIRE_MINOR) { + red_printf("unsuported net wire interface"); + return; + } + red_tunnel = red_tunnel_attach(core, net_wire); } break; case VD_INTERFACE_REMOVING: diff --git a/server/reds.h b/server/reds.h index ce09e5b..248edff 100644 --- a/server/reds.h +++ b/server/reds.h @@ -27,6 +27,9 @@ typedef struct RedsStreamContext { int socket; + /* set it to TRUE if you shutdown the socket. shutdown read doesn't work as accepted - + receive may return data afterwards. check the flag before calling receive*/ + int shutdown; SSL *ssl; int (*cb_write)(void *, void *, int); diff --git a/server/vd_interface.h b/server/vd_interface.h index 932c0b1..12dbd5f 100644 --- a/server/vd_interface.h +++ b/server/vd_interface.h @@ -330,5 +330,23 @@ struct VDIPortInterface { int (*read)(VDIPortInterface *port, VDObjectRef plug, uint8_t *buf, int len); }; +#define VD_INTERFACE_NET_WIRE "net_wire" +#define VD_INTERFACE_NET_WIRE_MAJOR 1 +#define VD_INTERFACE_NET_WIRE_MINOR 1 + +typedef struct NetWireInterface NetWireInterface; +typedef void (*net_wire_packet_route_proc_t)(void *opaque, const uint8_t *pkt, int pkt_len); + +struct NetWireInterface { + VDInterface base; + + struct in_addr (*get_ip)(NetWireInterface *vlan); + int (*can_send_packet)(NetWireInterface *vlan); + void (*send_packet)(NetWireInterface *vlan, const uint8_t *buf, int size); + VDObjectRef (*register_route_packet)(NetWireInterface *vlan, net_wire_packet_route_proc_t proc, + void *opaque); + void (*unregister_route_packet)(NetWireInterface *vlan, VDObjectRef proc); +}; + #endif |