diff options
author | Alon Levy <alevy@redhat.com> | 2010-11-13 13:23:02 +0200 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-08-23 17:42:36 +0300 |
commit | 7e8e13593ee681cf04c349bca57dd225d7802494 (patch) | |
tree | f47be108ffe570dfa942502b8bad837551db4720 /server/red_channel.c | |
parent | 75b6a305ff9c42a89c9db91277027d5dc6d103ef (diff) |
server/red_channel (all): introduce RedChannelClient
This commit adds a RedChannelClient that now owns the stream connection,
but still doesn't own the pipe. There is only a single RCC per RC
right now (and RC still means RedChannel, RedClient will be introduced
later). All internal api changes are in server/red_channel.h, hence
the need to update all channels. red_worker.c is affected the most because
it makes use of direct access to some of RedChannel still.
API changes:
1. red_channel_client_create added.
rec_channel_create -> (red_channel_create, red_channel_client_create)
2. two way connection: rcc->channel, channel->rcc (later channel will
hold a list, and there will be a RedClient to hold the list of channels
per client)
3. seperation of channel disconnect and channel_client_disconnect
TODO:
usbredir added untested.
Diffstat (limited to 'server/red_channel.c')
-rw-r--r-- | server/red_channel.c | 627 |
1 files changed, 405 insertions, 222 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 9ecc7ef..8bbc6c9 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -33,8 +33,7 @@ #include "red_channel.h" #include "generated_marshallers.h" -static PipeItem *red_channel_pipe_get(RedChannel *channel); -static void red_channel_event(int fd, int event, void *data); +static void red_channel_client_event(int fd, int event, void *data); /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) @@ -152,9 +151,14 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle } } +void red_channel_client_receive(RedChannelClient *rcc) +{ + red_peer_handle_incoming(rcc->stream, &rcc->incoming); +} + void red_channel_receive(RedChannel *channel) { - red_peer_handle_incoming(channel->stream, &channel->incoming); + red_channel_client_receive(channel->rcc); } static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) @@ -201,124 +205,194 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle } } -static void red_channel_on_output(void *opaque, int n) +static void red_channel_client_on_output(void *opaque, int n) { - RedChannel *channel = opaque; + RedChannelClient *rcc = opaque; - stat_inc_counter(channel->out_bytes_counter, n); + stat_inc_counter(rcc->channel->out_bytes_counter, n); } -void red_channel_default_peer_on_error(RedChannel *channel) +void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { - channel->disconnect(channel); + rcc->channel->disconnect(rcc); } -static void red_channel_peer_on_incoming_error(RedChannel *channel) +static void red_channel_peer_on_incoming_error(RedChannelClient *rcc) { - channel->on_incoming_error(channel); + rcc->channel->on_incoming_error(rcc); } -static void red_channel_peer_on_outgoing_error(RedChannel *channel) +static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc) { - channel->on_outgoing_error(channel); + rcc->channel->on_outgoing_error(rcc); } -static int red_channel_peer_get_out_msg_size(void *opaque) +static int red_channel_client_peer_get_out_msg_size(void *opaque) { - RedChannel *channel = (RedChannel *)opaque; + RedChannelClient *rcc = (RedChannelClient *)opaque; - return channel->send_data.size; + return rcc->send_data.size; } -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos) +static void red_channel_client_peer_prepare_out_msg( + void *opaque, struct iovec *vec, int *vec_size, int pos) { - RedChannel *channel = (RedChannel *)opaque; + RedChannelClient *rcc = (RedChannelClient *)opaque; - *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller, + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, vec, MAX_SEND_VEC, pos); } -static void red_channel_peer_on_out_block(void *opaque) +static void red_channel_client_peer_on_out_block(void *opaque) { - RedChannel *channel = (RedChannel *)opaque; + RedChannelClient *rcc = (RedChannelClient *)opaque; - channel->send_data.blocked = TRUE; - channel->core->watch_update_mask(channel->stream->watch, + rcc->send_data.blocked = TRUE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE); } -static void red_channel_reset_send_data(RedChannel *channel) +static void red_channel_client_reset_send_data(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.marshaller); + rcc->send_data.header = (SpiceDataHeader *) + spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); + spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); + rcc->send_data.header->type = 0; + rcc->send_data.header->size = 0; + rcc->send_data.header->sub_list = 0; + rcc->send_data.header->serial = ++rcc->send_data.serial; +} + +void red_channel_client_push_set_ack(RedChannelClient *rcc) { - spice_marshaller_reset(channel->send_data.marshaller); - channel->send_data.header = (SpiceDataHeader *) - spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader)); - spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader)); - channel->send_data.header->type = 0; - channel->send_data.header->size = 0; - channel->send_data.header->sub_list = 0; - channel->send_data.header->serial = ++channel->send_data.serial; + red_channel_pipe_add_type(rcc->channel, PIPE_ITEM_TYPE_SET_ACK); } void red_channel_push_set_ack(RedChannel *channel) { + // TODO - MC, should replace with add_type_all (or whatever I'll name it) red_channel_pipe_add_type(channel, PIPE_ITEM_TYPE_SET_ACK); } -static void red_channel_send_set_ack(RedChannel *channel) +static void red_channel_client_send_set_ack(RedChannelClient *rcc) { SpiceMsgSetAck ack; - ASSERT(channel); - red_channel_init_send_data(channel, SPICE_MSG_SET_ACK, NULL); - ack.generation = ++channel->ack_data.generation; - ack.window = channel->ack_data.client_window; - channel->ack_data.messages_window = 0; + ASSERT(rcc); + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); + ack.generation = ++rcc->ack_data.generation; + ack.window = rcc->ack_data.client_window; + rcc->ack_data.messages_window = 0; - spice_marshall_msg_set_ack(channel->send_data.marshaller, &ack); + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); - red_channel_begin_send_message(channel); + red_channel_client_begin_send_message(rcc); } -static void red_channel_send_item(RedChannel *channel, PipeItem *item) +static void red_channel_client_send_item(RedChannelClient *rcc, PipeItem *item) { - red_channel_reset_send_data(channel); + int handled = TRUE; + + ASSERT(red_channel_client_no_item_being_sent(rcc)); + red_channel_client_reset_send_data(rcc); switch (item->type) { case PIPE_ITEM_TYPE_SET_ACK: - red_channel_send_set_ack(channel); - return; + red_channel_client_send_set_ack(rcc); + break; + default: + handled = FALSE; + } + if (!handled) { + rcc->channel->send_item(rcc, item); } - /* only reached if not handled here */ - channel->send_item(channel, item); } -static void red_channel_release_item(RedChannel *channel, PipeItem *item, int item_pushed) +static void red_channel_client_release_item(RedChannelClient *rcc, PipeItem *item, int item_pushed) { + int handled = TRUE; + switch (item->type) { case PIPE_ITEM_TYPE_SET_ACK: free(item); - return; + break; + default: + handled = FALSE; + } + if (!handled) { + rcc->channel->release_item(rcc, item, item_pushed); } - /* only reached if not handled here */ - channel->release_item(channel, item, item_pushed); } -static void red_channel_peer_on_out_msg_done(void *opaque) +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) { - RedChannel *channel = (RedChannel *)opaque; - channel->send_data.size = 0; - if (channel->send_data.item) { - red_channel_release_item(channel, channel->send_data.item, TRUE); - channel->send_data.item = NULL; + if (rcc->send_data.item) { + red_channel_client_release_item(rcc, + rcc->send_data.item, TRUE); + rcc->send_data.item = NULL; } - if (channel->send_data.blocked) { - channel->send_data.blocked = FALSE; - channel->core->watch_update_mask(channel->stream->watch, +} + +static void red_channel_peer_on_out_msg_done(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + rcc->send_data.size = 0; + red_channel_client_release_sent_item(rcc); + if (rcc->send_data.blocked) { + rcc->send_data.blocked = FALSE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, SPICE_WATCH_EVENT_READ); } } -RedChannel *red_channel_create(int size, RedsStream *stream, +static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) +{ + ASSERT(rcc); + channel->rcc = rcc; +} + +RedChannelClient *red_channel_client_create( + int size, + RedChannel *channel, + RedsStream *stream) +{ + RedChannelClient *rcc = NULL; + + ASSERT(stream && channel && size >= sizeof(RedChannelClient)); + rcc = spice_malloc0(size); + rcc->stream = stream; + rcc->channel = channel; + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + rcc->ack_data.client_generation = ~0; + rcc->ack_data.client_window = CLIENT_ACK_WINDOW; + rcc->send_data.marshaller = spice_marshaller_new(); + + rcc->incoming.opaque = rcc; + rcc->incoming.cb = &channel->incoming_cb; + + rcc->outgoing.opaque = rcc; + rcc->outgoing.cb = &channel->outgoing_cb; + rcc->outgoing.pos = 0; + rcc->outgoing.size = 0; + if (!channel->config_socket(rcc)) { + goto error; + } + + stream->watch = channel->core->watch_add(stream->socket, + SPICE_WATCH_EVENT_READ, + red_channel_client_event, rcc); + red_channel_add_client(channel, rcc); + return rcc; +error: + free(rcc); + reds_stream_free(stream); + return NULL; +} + +RedChannel *red_channel_create(int size, SpiceCoreInterface *core, int migrate, int handle_acks, channel_configure_socket_proc config_socket, @@ -339,7 +413,6 @@ RedChannel *red_channel_create(int size, RedsStream *stream, ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf && release_item); channel = spice_malloc0(size); - channel->handle_acks = handle_acks; channel->disconnect = disconnect; channel->send_item = send_item; @@ -348,69 +421,40 @@ RedChannel *red_channel_create(int size, RedsStream *stream, channel->handle_migrate_flush_mark = handle_migrate_flush_mark; channel->handle_migrate_data = handle_migrate_data; channel->handle_migrate_data_get_serial = handle_migrate_data_get_serial; + channel->config_socket = config_socket; - channel->stream = stream; channel->core = core; - channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + - // block flags) - channel->ack_data.client_generation = ~0; - channel->ack_data.client_window = CLIENT_ACK_WINDOW; - channel->migrate = migrate; ring_init(&channel->pipe); - channel->send_data.marshaller = spice_marshaller_new(); - channel->incoming.opaque = channel; channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; channel->incoming_cb.handle_message = (handle_message_proc)handle_message; - channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error; - - channel->outgoing.opaque = channel; - channel->outgoing.pos = 0; - channel->outgoing.size = 0; - - channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size; - channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg; - channel->outgoing_cb.on_block = red_channel_peer_on_out_block; - channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; + channel->incoming_cb.on_error = + (on_incoming_error_proc)red_channel_client_default_peer_on_error; + channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size; + channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg; + channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block; + channel->outgoing_cb.on_error = + (on_outgoing_error_proc)red_channel_client_default_peer_on_error; channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done; - channel->outgoing_cb.on_output = red_channel_on_output; - - channel->incoming.cb = &channel->incoming_cb; - channel->outgoing.cb = &channel->outgoing_cb; + channel->outgoing_cb.on_output = red_channel_client_on_output; channel->shut = 0; // came here from inputs, perhaps can be removed? XXX channel->out_bytes_counter = 0; - - if (!config_socket(channel)) { - goto error; - } - - channel->stream->watch = channel->core->watch_add(channel->stream->socket, - SPICE_WATCH_EVENT_READ, - red_channel_event, channel); - return channel; - -error: - spice_marshaller_destroy(channel->send_data.marshaller); - free(channel); - reds_stream_free(stream); - - return NULL; } -static void do_nothing_disconnect(RedChannel *red_channel) +static void do_nothing_disconnect(RedChannelClient *rcc) { } -static int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg) +static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg) { return TRUE; } -RedChannel *red_channel_create_parser(int size, RedsStream *stream, +RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, int migrate, int handle_acks, channel_configure_socket_proc config_socket, @@ -427,7 +471,7 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream, channel_handle_migrate_data_proc handle_migrate_data, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial) { - RedChannel *channel = red_channel_create(size, stream, + RedChannel *channel = red_channel_create(size, core, migrate, handle_acks, config_socket, do_nothing_disconnect, do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item, send_item, release_item, handle_migrate_flush_mark, handle_migrate_data, @@ -438,62 +482,152 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream, } channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; channel->incoming_cb.parser = parser; - channel->on_incoming_error = incoming_error; - channel->on_outgoing_error = outgoing_error; channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; + channel->on_incoming_error = incoming_error; + channel->on_outgoing_error = outgoing_error; return channel; } +void red_channel_client_destroy(RedChannelClient *rcc) +{ + red_channel_client_disconnect(rcc); + spice_marshaller_destroy(rcc->send_data.marshaller); + free(rcc); +} + void red_channel_destroy(RedChannel *channel) { if (!channel) { return; } - red_channel_pipe_clear(channel); - reds_stream_free(channel->stream); - spice_marshaller_destroy(channel->send_data.marshaller); + if (channel->rcc) { + red_channel_client_destroy(channel->rcc); + } free(channel); } +static void red_channel_client_shutdown(RedChannelClient *rcc) +{ + if (rcc->stream && !rcc->stream->shutdown) { + rcc->channel->core->watch_remove(rcc->stream->watch); + rcc->stream->watch = NULL; + shutdown(rcc->stream->socket, SHUT_RDWR); + rcc->stream->shutdown = TRUE; + rcc->incoming.shut = TRUE; + } + red_channel_client_release_sent_item(rcc); +} + void red_channel_shutdown(RedChannel *channel) { - red_printf(""); - if (channel->stream && !channel->stream->shutdown) { - channel->core->watch_update_mask(channel->stream->watch, - SPICE_WATCH_EVENT_READ); - red_channel_pipe_clear(channel); - shutdown(channel->stream->socket, SHUT_RDWR); - channel->stream->shutdown = TRUE; - channel->incoming.shut = TRUE; + if (channel->rcc) { + red_channel_client_shutdown(channel->rcc); + } + red_channel_pipe_clear(channel); +} + +void red_channel_client_send(RedChannelClient *rcc) +{ + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); +} + +void red_channel_send(RedChannel *channel) +{ + if (channel->rcc) { + red_channel_client_send(channel->rcc); } } +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) +{ + return (rcc->channel->handle_acks && + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); +} + +// TODO: add refs and target to PipeItem. Right now this only works for a +// single client (or actually, it's worse - first come first served) +static inline PipeItem *red_channel_client_pipe_get(RedChannelClient *rcc) +{ + PipeItem *item; + + if (!rcc || rcc->send_data.blocked + || red_channel_client_waiting_for_ack(rcc) + || !(item = (PipeItem *)ring_get_tail(&rcc->channel->pipe))) { + return NULL; + } + --rcc->channel->pipe_size; + ring_remove(&item->link); + return item; +} + +static void red_channel_client_push(RedChannelClient *rcc) +{ + PipeItem *pipe_item; + + if (!rcc->during_send) { + rcc->during_send = TRUE; + } else { + return; + } + + if (rcc->send_data.blocked) { + red_channel_client_send(rcc); + } + + while ((pipe_item = red_channel_client_pipe_get(rcc))) { + red_channel_client_send_item(rcc, pipe_item); + } + rcc->during_send = FALSE; +} + +void red_channel_push(RedChannel *channel) +{ + if (!channel || !channel->rcc) { + return; + } + red_channel_client_push(channel->rcc); +} + +static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; + red_channel_client_push(rcc); +} + +// TODO: this function doesn't make sense because the window should be client (WAN/LAN) +// specific void red_channel_init_outgoing_messages_window(RedChannel *channel) { - channel->ack_data.messages_window = 0; - red_channel_push(channel); + red_channel_client_init_outgoing_messages_window(channel->rcc); } static void red_channel_handle_migrate_flush_mark(RedChannel *channel) { if (channel->handle_migrate_flush_mark) { - channel->handle_migrate_flush_mark(channel); + channel->handle_migrate_flush_mark(channel->rcc); } } -static void red_channel_handle_migrate_data(RedChannel *channel, uint32_t size, void *message) +// TODO: the whole migration is broken with multiple clients. What do we want to do? +// basically just +// 1) source send mark to all +// 2) source gets at various times the data (waits for all) +// 3) source migrates to target +// 4) target sends data to all +// So need to make all the handlers work with per channel/client data (what data exactly?) +static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) { - if (!channel->handle_migrate_data) { + if (!rcc->channel->handle_migrate_data) { return; } - ASSERT(red_channel_get_message_serial(channel) == 0); - red_channel_set_message_serial(channel, - channel->handle_migrate_data_get_serial(channel, size, message)); - channel->handle_migrate_data(channel, size, message); + ASSERT(red_channel_client_get_message_serial(rcc) == 0); + red_channel_client_set_message_serial(rcc, + rcc->channel->handle_migrate_data_get_serial(rcc, size, message)); + rcc->channel->handle_migrate_data(rcc, size, message); } -int red_channel_handle_message(RedChannel *channel, uint32_t size, +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message) { switch (type) { @@ -502,21 +636,21 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size, red_printf("bad message size"); return FALSE; } - channel->ack_data.client_generation = *(uint32_t *)(message); + rcc->ack_data.client_generation = *(uint32_t *)(message); break; case SPICE_MSGC_ACK: - if (channel->ack_data.client_generation == channel->ack_data.generation) { - channel->ack_data.messages_window -= channel->ack_data.client_window; - red_channel_push(channel); + if (rcc->ack_data.client_generation == rcc->ack_data.generation) { + rcc->ack_data.messages_window -= rcc->ack_data.client_window; + red_channel_client_push(rcc); } break; case SPICE_MSGC_DISCONNECTING: break; case SPICE_MSGC_MIGRATE_FLUSH_MARK: - red_channel_handle_migrate_flush_mark(channel); + red_channel_handle_migrate_flush_mark(rcc->channel); break; case SPICE_MSGC_MIGRATE_DATA: - red_channel_handle_migrate_data(channel, size, message); + red_channel_handle_migrate_data(rcc, size, message); break; default: red_printf("invalid message type %u", type); @@ -525,75 +659,54 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size, return TRUE; } -static void red_channel_event(int fd, int event, void *data) +static void red_channel_client_event(int fd, int event, void *data) { - RedChannel *channel = (RedChannel *)data; + RedChannelClient *rcc = (RedChannelClient *)data; if (event & SPICE_WATCH_EVENT_READ) { - red_channel_receive(channel); + red_channel_client_receive(rcc); } if (event & SPICE_WATCH_EVENT_WRITE) { - red_channel_push(channel); + red_channel_client_push(rcc); } } -void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item) +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item) { - ASSERT(channel->send_data.item == NULL); - channel->send_data.header->type = msg_type; - channel->send_data.item = item; + ASSERT(red_channel_client_no_item_being_sent(rcc)); + ASSERT(msg_type != 0); + rcc->send_data.header->type = msg_type; + rcc->send_data.item = item; if (item) { - channel->hold_item(channel, item); + rcc->channel->hold_item(rcc, item); } } -void red_channel_send(RedChannel *channel) -{ - red_peer_handle_outgoing(channel->stream, &channel->outgoing); -} - -void red_channel_begin_send_message(RedChannel *channel) -{ - spice_marshaller_flush(channel->send_data.marshaller); - channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller); - channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader); - channel->ack_data.messages_window++; - channel->send_data.header = NULL; /* avoid writing to this until we have a new message */ - red_channel_send(channel); -} - -void red_channel_push(RedChannel *channel) +void red_channel_client_begin_send_message(RedChannelClient *rcc) { - PipeItem *pipe_item; - - if (!channel) { - return; - } + SpiceMarshaller *m = rcc->send_data.marshaller; - if (!channel->during_send) { - channel->during_send = TRUE; - } else { + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) + if (rcc->send_data.header->type == 0) { + red_printf("BUG: header->type == 0"); return; } - - if (channel->send_data.blocked) { - red_channel_send(channel); - } - - while ((pipe_item = red_channel_pipe_get(channel))) { - red_channel_send_item(channel, pipe_item); - } - channel->during_send = FALSE; + spice_marshaller_flush(m); + rcc->send_data.size = spice_marshaller_get_total_size(m); + rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader); + rcc->ack_data.messages_window++; + rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */ + red_channel_client_send(rcc); } -uint64_t red_channel_get_message_serial(RedChannel *channel) +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) { - return channel->send_data.serial; + return rcc->send_data.serial; } -void red_channel_set_message_serial(RedChannel *channel, uint64_t serial) +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) { - channel->send_data.serial = serial; + rcc->send_data.serial = serial; } void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type) @@ -657,28 +770,19 @@ void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type) red_channel_push(channel); } -static inline int red_channel_waiting_for_ack(RedChannel *channel) +int red_channel_is_connected(RedChannel *channel) { - return (channel->handle_acks && (channel->ack_data.messages_window > channel->ack_data.client_window * 2)); + return channel->rcc != NULL; } -static inline PipeItem *red_channel_pipe_get(RedChannel *channel) +void red_channel_client_clear_sent_item(RedChannelClient *rcc) { - PipeItem *item; - - if (!channel || channel->send_data.blocked || - red_channel_waiting_for_ack(channel) || - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { - return NULL; + if (rcc->send_data.item) { + red_channel_client_release_item(rcc, rcc->send_data.item, TRUE); + rcc->send_data.item = NULL; } - --channel->pipe_size; - ring_remove(&item->link); - return item; -} - -int red_channel_is_connected(RedChannel *channel) -{ - return !!channel->stream; + rcc->send_data.blocked = FALSE; + rcc->send_data.size = 0; } void red_channel_pipe_clear(RedChannel *channel) @@ -686,82 +790,161 @@ void red_channel_pipe_clear(RedChannel *channel) PipeItem *item; ASSERT(channel); - if (channel->send_data.item) { - red_channel_release_item(channel, channel->send_data.item, TRUE); - channel->send_data.item = NULL; + if (channel->rcc) { + red_channel_client_clear_sent_item(channel->rcc); } while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { ring_remove(&item->link); - red_channel_release_item(channel, item, FALSE); + red_channel_client_release_item(channel->rcc, item, FALSE); } channel->pipe_size = 0; } +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; +} + void red_channel_ack_zero_messages_window(RedChannel *channel) { - channel->ack_data.messages_window = 0; + red_channel_client_ack_zero_messages_window(channel->rcc); } -void red_channel_ack_set_client_window(RedChannel *channel, int client_window) +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) { - channel->ack_data.client_window = client_window; + rcc->ack_data.client_window = client_window; +} + +void red_channel_ack_set_client_window(RedChannel* channel, int client_window) +{ + if (channel->rcc) { + red_channel_client_ack_set_client_window(channel->rcc, client_window); + } +} + +void red_channel_client_disconnect(RedChannelClient *rcc) +{ + red_printf("%p (channel %p)", rcc, rcc->channel); + + if (rcc->send_data.item) { + rcc->channel->release_item(rcc, rcc->send_data.item, FALSE); + } + // TODO: clear our references from the pipe + reds_stream_free(rcc->stream); + rcc->send_data.item = NULL; + rcc->send_data.blocked = FALSE; + rcc->send_data.size = 0; + rcc->channel->rcc = NULL; +} + +void red_channel_disconnect(RedChannel *channel) +{ + red_channel_pipe_clear(channel); + if (channel->rcc) { + red_channel_client_disconnect(channel->rcc); + } +} + +int red_channel_all_clients_serials_are_zero(RedChannel *channel) +{ + return (!channel->rcc || channel->rcc->send_data.serial == 0); +} + +void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v) +{ + if (channel->rcc) { + v(channel->rcc); + } +} + +void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data) +{ + if (channel->rcc) { + v(channel->rcc, data); + } +} + +void red_channel_set_shut(RedChannel *channel) +{ + if (channel->rcc) { + channel->rcc->incoming.shut = TRUE; + } } int red_channel_all_blocked(RedChannel *channel) { - return channel->send_data.blocked; + return !channel || !channel->rcc || channel->rcc->send_data.blocked; } int red_channel_any_blocked(RedChannel *channel) { - return channel->send_data.blocked; + return !channel || !channel->rcc || channel->rcc->send_data.blocked; } -int red_channel_send_message_pending(RedChannel *channel) +int red_channel_client_blocked(RedChannelClient *rcc) { - return channel->send_data.header->type != 0; + return rcc && rcc->send_data.blocked; } -/* accessors for RedChannel */ -SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel) +int red_channel_client_send_message_pending(RedChannelClient *rcc) { - return channel->send_data.marshaller; + return rcc->send_data.header->type != 0; } -RedsStream *red_channel_get_stream(RedChannel *channel) +/* accessors for RedChannelClient */ +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) { - return channel->stream; + return rcc->send_data.marshaller; } -SpiceDataHeader *red_channel_get_header(RedChannel *channel) +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) { - return channel->send_data.header; + return rcc->stream; +} + +SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc) +{ + return rcc->send_data.header; } /* end of accessors */ int red_channel_get_first_socket(RedChannel *channel) { - if (!channel->stream) { + if (!channel->rcc || !channel->rcc->stream) { return -1; } - return channel->stream->socket; + return channel->rcc->stream->socket; +} + +int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item) +{ + return rcc->send_data.item == item; } int red_channel_item_being_sent(RedChannel *channel, PipeItem *item) { - return channel->send_data.item == item; + return channel->rcc && red_channel_client_item_being_sent(channel->rcc, item); } int red_channel_no_item_being_sent(RedChannel *channel) { - return channel->send_data.item == NULL; + return !channel->rcc || red_channel_client_no_item_being_sent(channel->rcc); } -void red_channel_disconnect(RedChannel *channel) +int red_channel_client_no_item_being_sent(RedChannelClient *rcc) { - red_channel_pipe_clear(channel); - reds_stream_free(channel->stream); - channel->stream = NULL; - channel->send_data.blocked = FALSE; - channel->send_data.size = 0; + return !rcc || (rcc->send_data.size == 0); +} + +static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item) +{ + rcc->channel->pipe_size--; + ring_remove(&item->link); +} + +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, + PipeItem *item) +{ + red_channel_client_pipe_remove(rcc, item); + red_channel_client_release_item(rcc, item, FALSE); } |