summaryrefslogtreecommitdiff
path: root/server/red_channel.c
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2010-11-13 13:23:02 +0200
committerAlon Levy <alevy@redhat.com>2011-08-23 17:42:36 +0300
commit7e8e13593ee681cf04c349bca57dd225d7802494 (patch)
treef47be108ffe570dfa942502b8bad837551db4720 /server/red_channel.c
parent75b6a305ff9c42a89c9db91277027d5dc6d103ef (diff)
server/red_channel (all): introduce RedChannelClient
This commit adds a RedChannelClient that now owns the stream connection, but still doesn't own the pipe. There is only a single RCC per RC right now (and RC still means RedChannel, RedClient will be introduced later). All internal api changes are in server/red_channel.h, hence the need to update all channels. red_worker.c is affected the most because it makes use of direct access to some of RedChannel still. API changes: 1. red_channel_client_create added. rec_channel_create -> (red_channel_create, red_channel_client_create) 2. two way connection: rcc->channel, channel->rcc (later channel will hold a list, and there will be a RedClient to hold the list of channels per client) 3. seperation of channel disconnect and channel_client_disconnect TODO: usbredir added untested.
Diffstat (limited to 'server/red_channel.c')
-rw-r--r--server/red_channel.c627
1 files changed, 405 insertions, 222 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 9ecc7ef..8bbc6c9 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -33,8 +33,7 @@
#include "red_channel.h"
#include "generated_marshallers.h"
-static PipeItem *red_channel_pipe_get(RedChannel *channel);
-static void red_channel_event(int fd, int event, void *data);
+static void red_channel_client_event(int fd, int event, void *data);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -152,9 +151,14 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
}
}
+void red_channel_client_receive(RedChannelClient *rcc)
+{
+ red_peer_handle_incoming(rcc->stream, &rcc->incoming);
+}
+
void red_channel_receive(RedChannel *channel)
{
- red_peer_handle_incoming(channel->stream, &channel->incoming);
+ red_channel_client_receive(channel->rcc);
}
static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
@@ -201,124 +205,194 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
}
}
-static void red_channel_on_output(void *opaque, int n)
+static void red_channel_client_on_output(void *opaque, int n)
{
- RedChannel *channel = opaque;
+ RedChannelClient *rcc = opaque;
- stat_inc_counter(channel->out_bytes_counter, n);
+ stat_inc_counter(rcc->channel->out_bytes_counter, n);
}
-void red_channel_default_peer_on_error(RedChannel *channel)
+void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
{
- channel->disconnect(channel);
+ rcc->channel->disconnect(rcc);
}
-static void red_channel_peer_on_incoming_error(RedChannel *channel)
+static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
{
- channel->on_incoming_error(channel);
+ rcc->channel->on_incoming_error(rcc);
}
-static void red_channel_peer_on_outgoing_error(RedChannel *channel)
+static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
{
- channel->on_outgoing_error(channel);
+ rcc->channel->on_outgoing_error(rcc);
}
-static int red_channel_peer_get_out_msg_size(void *opaque)
+static int red_channel_client_peer_get_out_msg_size(void *opaque)
{
- RedChannel *channel = (RedChannel *)opaque;
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
- return channel->send_data.size;
+ return rcc->send_data.size;
}
-static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
+static void red_channel_client_peer_prepare_out_msg(
+ void *opaque, struct iovec *vec, int *vec_size, int pos)
{
- RedChannel *channel = (RedChannel *)opaque;
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
- *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+ *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
vec, MAX_SEND_VEC, pos);
}
-static void red_channel_peer_on_out_block(void *opaque)
+static void red_channel_client_peer_on_out_block(void *opaque)
{
- RedChannel *channel = (RedChannel *)opaque;
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
- channel->send_data.blocked = TRUE;
- channel->core->watch_update_mask(channel->stream->watch,
+ rcc->send_data.blocked = TRUE;
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
SPICE_WATCH_EVENT_READ |
SPICE_WATCH_EVENT_WRITE);
}
-static void red_channel_reset_send_data(RedChannel *channel)
+static void red_channel_client_reset_send_data(RedChannelClient *rcc)
+{
+ spice_marshaller_reset(rcc->send_data.marshaller);
+ rcc->send_data.header = (SpiceDataHeader *)
+ spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
+ spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
+ rcc->send_data.header->type = 0;
+ rcc->send_data.header->size = 0;
+ rcc->send_data.header->sub_list = 0;
+ rcc->send_data.header->serial = ++rcc->send_data.serial;
+}
+
+void red_channel_client_push_set_ack(RedChannelClient *rcc)
{
- spice_marshaller_reset(channel->send_data.marshaller);
- channel->send_data.header = (SpiceDataHeader *)
- spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
- spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
- channel->send_data.header->type = 0;
- channel->send_data.header->size = 0;
- channel->send_data.header->sub_list = 0;
- channel->send_data.header->serial = ++channel->send_data.serial;
+ red_channel_pipe_add_type(rcc->channel, PIPE_ITEM_TYPE_SET_ACK);
}
void red_channel_push_set_ack(RedChannel *channel)
{
+ // TODO - MC, should replace with add_type_all (or whatever I'll name it)
red_channel_pipe_add_type(channel, PIPE_ITEM_TYPE_SET_ACK);
}
-static void red_channel_send_set_ack(RedChannel *channel)
+static void red_channel_client_send_set_ack(RedChannelClient *rcc)
{
SpiceMsgSetAck ack;
- ASSERT(channel);
- red_channel_init_send_data(channel, SPICE_MSG_SET_ACK, NULL);
- ack.generation = ++channel->ack_data.generation;
- ack.window = channel->ack_data.client_window;
- channel->ack_data.messages_window = 0;
+ ASSERT(rcc);
+ red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
+ ack.generation = ++rcc->ack_data.generation;
+ ack.window = rcc->ack_data.client_window;
+ rcc->ack_data.messages_window = 0;
- spice_marshall_msg_set_ack(channel->send_data.marshaller, &ack);
+ spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
- red_channel_begin_send_message(channel);
+ red_channel_client_begin_send_message(rcc);
}
-static void red_channel_send_item(RedChannel *channel, PipeItem *item)
+static void red_channel_client_send_item(RedChannelClient *rcc, PipeItem *item)
{
- red_channel_reset_send_data(channel);
+ int handled = TRUE;
+
+ ASSERT(red_channel_client_no_item_being_sent(rcc));
+ red_channel_client_reset_send_data(rcc);
switch (item->type) {
case PIPE_ITEM_TYPE_SET_ACK:
- red_channel_send_set_ack(channel);
- return;
+ red_channel_client_send_set_ack(rcc);
+ break;
+ default:
+ handled = FALSE;
+ }
+ if (!handled) {
+ rcc->channel->send_item(rcc, item);
}
- /* only reached if not handled here */
- channel->send_item(channel, item);
}
-static void red_channel_release_item(RedChannel *channel, PipeItem *item, int item_pushed)
+static void red_channel_client_release_item(RedChannelClient *rcc, PipeItem *item, int item_pushed)
{
+ int handled = TRUE;
+
switch (item->type) {
case PIPE_ITEM_TYPE_SET_ACK:
free(item);
- return;
+ break;
+ default:
+ handled = FALSE;
+ }
+ if (!handled) {
+ rcc->channel->release_item(rcc, item, item_pushed);
}
- /* only reached if not handled here */
- channel->release_item(channel, item, item_pushed);
}
-static void red_channel_peer_on_out_msg_done(void *opaque)
+static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
{
- RedChannel *channel = (RedChannel *)opaque;
- channel->send_data.size = 0;
- if (channel->send_data.item) {
- red_channel_release_item(channel, channel->send_data.item, TRUE);
- channel->send_data.item = NULL;
+ if (rcc->send_data.item) {
+ red_channel_client_release_item(rcc,
+ rcc->send_data.item, TRUE);
+ rcc->send_data.item = NULL;
}
- if (channel->send_data.blocked) {
- channel->send_data.blocked = FALSE;
- channel->core->watch_update_mask(channel->stream->watch,
+}
+
+static void red_channel_peer_on_out_msg_done(void *opaque)
+{
+ RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+ rcc->send_data.size = 0;
+ red_channel_client_release_sent_item(rcc);
+ if (rcc->send_data.blocked) {
+ rcc->send_data.blocked = FALSE;
+ rcc->channel->core->watch_update_mask(rcc->stream->watch,
SPICE_WATCH_EVENT_READ);
}
}
-RedChannel *red_channel_create(int size, RedsStream *stream,
+static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
+{
+ ASSERT(rcc);
+ channel->rcc = rcc;
+}
+
+RedChannelClient *red_channel_client_create(
+ int size,
+ RedChannel *channel,
+ RedsStream *stream)
+{
+ RedChannelClient *rcc = NULL;
+
+ ASSERT(stream && channel && size >= sizeof(RedChannelClient));
+ rcc = spice_malloc0(size);
+ rcc->stream = stream;
+ rcc->channel = channel;
+ rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
+ // block flags)
+ rcc->ack_data.client_generation = ~0;
+ rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
+ rcc->send_data.marshaller = spice_marshaller_new();
+
+ rcc->incoming.opaque = rcc;
+ rcc->incoming.cb = &channel->incoming_cb;
+
+ rcc->outgoing.opaque = rcc;
+ rcc->outgoing.cb = &channel->outgoing_cb;
+ rcc->outgoing.pos = 0;
+ rcc->outgoing.size = 0;
+ if (!channel->config_socket(rcc)) {
+ goto error;
+ }
+
+ stream->watch = channel->core->watch_add(stream->socket,
+ SPICE_WATCH_EVENT_READ,
+ red_channel_client_event, rcc);
+ red_channel_add_client(channel, rcc);
+ return rcc;
+error:
+ free(rcc);
+ reds_stream_free(stream);
+ return NULL;
+}
+
+RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
@@ -339,7 +413,6 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
release_item);
channel = spice_malloc0(size);
-
channel->handle_acks = handle_acks;
channel->disconnect = disconnect;
channel->send_item = send_item;
@@ -348,69 +421,40 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
channel->handle_migrate_flush_mark = handle_migrate_flush_mark;
channel->handle_migrate_data = handle_migrate_data;
channel->handle_migrate_data_get_serial = handle_migrate_data_get_serial;
+ channel->config_socket = config_socket;
- channel->stream = stream;
channel->core = core;
- channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
- // block flags)
- channel->ack_data.client_generation = ~0;
- channel->ack_data.client_window = CLIENT_ACK_WINDOW;
-
channel->migrate = migrate;
ring_init(&channel->pipe);
- channel->send_data.marshaller = spice_marshaller_new();
- channel->incoming.opaque = channel;
channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
- channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
-
- channel->outgoing.opaque = channel;
- channel->outgoing.pos = 0;
- channel->outgoing.size = 0;
-
- channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
- channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
- channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
- channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
+ channel->incoming_cb.on_error =
+ (on_incoming_error_proc)red_channel_client_default_peer_on_error;
+ channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size;
+ channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg;
+ channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block;
+ channel->outgoing_cb.on_error =
+ (on_outgoing_error_proc)red_channel_client_default_peer_on_error;
channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
- channel->outgoing_cb.on_output = red_channel_on_output;
-
- channel->incoming.cb = &channel->incoming_cb;
- channel->outgoing.cb = &channel->outgoing_cb;
+ channel->outgoing_cb.on_output = red_channel_client_on_output;
channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
channel->out_bytes_counter = 0;
-
- if (!config_socket(channel)) {
- goto error;
- }
-
- channel->stream->watch = channel->core->watch_add(channel->stream->socket,
- SPICE_WATCH_EVENT_READ,
- red_channel_event, channel);
-
return channel;
-
-error:
- spice_marshaller_destroy(channel->send_data.marshaller);
- free(channel);
- reds_stream_free(stream);
-
- return NULL;
}
-static void do_nothing_disconnect(RedChannel *red_channel)
+static void do_nothing_disconnect(RedChannelClient *rcc)
{
}
-static int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg)
+static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
{
return TRUE;
}
-RedChannel *red_channel_create_parser(int size, RedsStream *stream,
+RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
@@ -427,7 +471,7 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
{
- RedChannel *channel = red_channel_create(size, stream,
+ RedChannel *channel = red_channel_create(size,
core, migrate, handle_acks, config_socket, do_nothing_disconnect,
do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
@@ -438,62 +482,152 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
}
channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
channel->incoming_cb.parser = parser;
- channel->on_incoming_error = incoming_error;
- channel->on_outgoing_error = outgoing_error;
channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
+ channel->on_incoming_error = incoming_error;
+ channel->on_outgoing_error = outgoing_error;
return channel;
}
+void red_channel_client_destroy(RedChannelClient *rcc)
+{
+ red_channel_client_disconnect(rcc);
+ spice_marshaller_destroy(rcc->send_data.marshaller);
+ free(rcc);
+}
+
void red_channel_destroy(RedChannel *channel)
{
if (!channel) {
return;
}
- red_channel_pipe_clear(channel);
- reds_stream_free(channel->stream);
- spice_marshaller_destroy(channel->send_data.marshaller);
+ if (channel->rcc) {
+ red_channel_client_destroy(channel->rcc);
+ }
free(channel);
}
+static void red_channel_client_shutdown(RedChannelClient *rcc)
+{
+ if (rcc->stream && !rcc->stream->shutdown) {
+ rcc->channel->core->watch_remove(rcc->stream->watch);
+ rcc->stream->watch = NULL;
+ shutdown(rcc->stream->socket, SHUT_RDWR);
+ rcc->stream->shutdown = TRUE;
+ rcc->incoming.shut = TRUE;
+ }
+ red_channel_client_release_sent_item(rcc);
+}
+
void red_channel_shutdown(RedChannel *channel)
{
- red_printf("");
- if (channel->stream && !channel->stream->shutdown) {
- channel->core->watch_update_mask(channel->stream->watch,
- SPICE_WATCH_EVENT_READ);
- red_channel_pipe_clear(channel);
- shutdown(channel->stream->socket, SHUT_RDWR);
- channel->stream->shutdown = TRUE;
- channel->incoming.shut = TRUE;
+ if (channel->rcc) {
+ red_channel_client_shutdown(channel->rcc);
+ }
+ red_channel_pipe_clear(channel);
+}
+
+void red_channel_client_send(RedChannelClient *rcc)
+{
+ red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
+}
+
+void red_channel_send(RedChannel *channel)
+{
+ if (channel->rcc) {
+ red_channel_client_send(channel->rcc);
}
}
+static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
+{
+ return (rcc->channel->handle_acks &&
+ (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
+}
+
+// TODO: add refs and target to PipeItem. Right now this only works for a
+// single client (or actually, it's worse - first come first served)
+static inline PipeItem *red_channel_client_pipe_get(RedChannelClient *rcc)
+{
+ PipeItem *item;
+
+ if (!rcc || rcc->send_data.blocked
+ || red_channel_client_waiting_for_ack(rcc)
+ || !(item = (PipeItem *)ring_get_tail(&rcc->channel->pipe))) {
+ return NULL;
+ }
+ --rcc->channel->pipe_size;
+ ring_remove(&item->link);
+ return item;
+}
+
+static void red_channel_client_push(RedChannelClient *rcc)
+{
+ PipeItem *pipe_item;
+
+ if (!rcc->during_send) {
+ rcc->during_send = TRUE;
+ } else {
+ return;
+ }
+
+ if (rcc->send_data.blocked) {
+ red_channel_client_send(rcc);
+ }
+
+ while ((pipe_item = red_channel_client_pipe_get(rcc))) {
+ red_channel_client_send_item(rcc, pipe_item);
+ }
+ rcc->during_send = FALSE;
+}
+
+void red_channel_push(RedChannel *channel)
+{
+ if (!channel || !channel->rcc) {
+ return;
+ }
+ red_channel_client_push(channel->rcc);
+}
+
+static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
+{
+ rcc->ack_data.messages_window = 0;
+ red_channel_client_push(rcc);
+}
+
+// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
+// specific
void red_channel_init_outgoing_messages_window(RedChannel *channel)
{
- channel->ack_data.messages_window = 0;
- red_channel_push(channel);
+ red_channel_client_init_outgoing_messages_window(channel->rcc);
}
static void red_channel_handle_migrate_flush_mark(RedChannel *channel)
{
if (channel->handle_migrate_flush_mark) {
- channel->handle_migrate_flush_mark(channel);
+ channel->handle_migrate_flush_mark(channel->rcc);
}
}
-static void red_channel_handle_migrate_data(RedChannel *channel, uint32_t size, void *message)
+// TODO: the whole migration is broken with multiple clients. What do we want to do?
+// basically just
+// 1) source send mark to all
+// 2) source gets at various times the data (waits for all)
+// 3) source migrates to target
+// 4) target sends data to all
+// So need to make all the handlers work with per channel/client data (what data exactly?)
+static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
{
- if (!channel->handle_migrate_data) {
+ if (!rcc->channel->handle_migrate_data) {
return;
}
- ASSERT(red_channel_get_message_serial(channel) == 0);
- red_channel_set_message_serial(channel,
- channel->handle_migrate_data_get_serial(channel, size, message));
- channel->handle_migrate_data(channel, size, message);
+ ASSERT(red_channel_client_get_message_serial(rcc) == 0);
+ red_channel_client_set_message_serial(rcc,
+ rcc->channel->handle_migrate_data_get_serial(rcc, size, message));
+ rcc->channel->handle_migrate_data(rcc, size, message);
}
-int red_channel_handle_message(RedChannel *channel, uint32_t size,
+int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
uint16_t type, void *message)
{
switch (type) {
@@ -502,21 +636,21 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
red_printf("bad message size");
return FALSE;
}
- channel->ack_data.client_generation = *(uint32_t *)(message);
+ rcc->ack_data.client_generation = *(uint32_t *)(message);
break;
case SPICE_MSGC_ACK:
- if (channel->ack_data.client_generation == channel->ack_data.generation) {
- channel->ack_data.messages_window -= channel->ack_data.client_window;
- red_channel_push(channel);
+ if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
+ rcc->ack_data.messages_window -= rcc->ack_data.client_window;
+ red_channel_client_push(rcc);
}
break;
case SPICE_MSGC_DISCONNECTING:
break;
case SPICE_MSGC_MIGRATE_FLUSH_MARK:
- red_channel_handle_migrate_flush_mark(channel);
+ red_channel_handle_migrate_flush_mark(rcc->channel);
break;
case SPICE_MSGC_MIGRATE_DATA:
- red_channel_handle_migrate_data(channel, size, message);
+ red_channel_handle_migrate_data(rcc, size, message);
break;
default:
red_printf("invalid message type %u", type);
@@ -525,75 +659,54 @@ int red_channel_handle_message(RedChannel *channel, uint32_t size,
return TRUE;
}
-static void red_channel_event(int fd, int event, void *data)
+static void red_channel_client_event(int fd, int event, void *data)
{
- RedChannel *channel = (RedChannel *)data;
+ RedChannelClient *rcc = (RedChannelClient *)data;
if (event & SPICE_WATCH_EVENT_READ) {
- red_channel_receive(channel);
+ red_channel_client_receive(rcc);
}
if (event & SPICE_WATCH_EVENT_WRITE) {
- red_channel_push(channel);
+ red_channel_client_push(rcc);
}
}
-void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
+void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item)
{
- ASSERT(channel->send_data.item == NULL);
- channel->send_data.header->type = msg_type;
- channel->send_data.item = item;
+ ASSERT(red_channel_client_no_item_being_sent(rcc));
+ ASSERT(msg_type != 0);
+ rcc->send_data.header->type = msg_type;
+ rcc->send_data.item = item;
if (item) {
- channel->hold_item(channel, item);
+ rcc->channel->hold_item(rcc, item);
}
}
-void red_channel_send(RedChannel *channel)
-{
- red_peer_handle_outgoing(channel->stream, &channel->outgoing);
-}
-
-void red_channel_begin_send_message(RedChannel *channel)
-{
- spice_marshaller_flush(channel->send_data.marshaller);
- channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
- channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
- channel->ack_data.messages_window++;
- channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
- red_channel_send(channel);
-}
-
-void red_channel_push(RedChannel *channel)
+void red_channel_client_begin_send_message(RedChannelClient *rcc)
{
- PipeItem *pipe_item;
-
- if (!channel) {
- return;
- }
+ SpiceMarshaller *m = rcc->send_data.marshaller;
- if (!channel->during_send) {
- channel->during_send = TRUE;
- } else {
+ // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
+ if (rcc->send_data.header->type == 0) {
+ red_printf("BUG: header->type == 0");
return;
}
-
- if (channel->send_data.blocked) {
- red_channel_send(channel);
- }
-
- while ((pipe_item = red_channel_pipe_get(channel))) {
- red_channel_send_item(channel, pipe_item);
- }
- channel->during_send = FALSE;
+ spice_marshaller_flush(m);
+ rcc->send_data.size = spice_marshaller_get_total_size(m);
+ rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader);
+ rcc->ack_data.messages_window++;
+ rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */
+ red_channel_client_send(rcc);
}
-uint64_t red_channel_get_message_serial(RedChannel *channel)
+uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
{
- return channel->send_data.serial;
+ return rcc->send_data.serial;
}
-void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
+void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
{
- channel->send_data.serial = serial;
+ rcc->send_data.serial = serial;
}
void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
@@ -657,28 +770,19 @@ void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
red_channel_push(channel);
}
-static inline int red_channel_waiting_for_ack(RedChannel *channel)
+int red_channel_is_connected(RedChannel *channel)
{
- return (channel->handle_acks && (channel->ack_data.messages_window > channel->ack_data.client_window * 2));
+ return channel->rcc != NULL;
}
-static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
+void red_channel_client_clear_sent_item(RedChannelClient *rcc)
{
- PipeItem *item;
-
- if (!channel || channel->send_data.blocked ||
- red_channel_waiting_for_ack(channel) ||
- !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
- return NULL;
+ if (rcc->send_data.item) {
+ red_channel_client_release_item(rcc, rcc->send_data.item, TRUE);
+ rcc->send_data.item = NULL;
}
- --channel->pipe_size;
- ring_remove(&item->link);
- return item;
-}
-
-int red_channel_is_connected(RedChannel *channel)
-{
- return !!channel->stream;
+ rcc->send_data.blocked = FALSE;
+ rcc->send_data.size = 0;
}
void red_channel_pipe_clear(RedChannel *channel)
@@ -686,82 +790,161 @@ void red_channel_pipe_clear(RedChannel *channel)
PipeItem *item;
ASSERT(channel);
- if (channel->send_data.item) {
- red_channel_release_item(channel, channel->send_data.item, TRUE);
- channel->send_data.item = NULL;
+ if (channel->rcc) {
+ red_channel_client_clear_sent_item(channel->rcc);
}
while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
ring_remove(&item->link);
- red_channel_release_item(channel, item, FALSE);
+ red_channel_client_release_item(channel->rcc, item, FALSE);
}
channel->pipe_size = 0;
}
+void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
+{
+ rcc->ack_data.messages_window = 0;
+}
+
void red_channel_ack_zero_messages_window(RedChannel *channel)
{
- channel->ack_data.messages_window = 0;
+ red_channel_client_ack_zero_messages_window(channel->rcc);
}
-void red_channel_ack_set_client_window(RedChannel *channel, int client_window)
+void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
{
- channel->ack_data.client_window = client_window;
+ rcc->ack_data.client_window = client_window;
+}
+
+void red_channel_ack_set_client_window(RedChannel* channel, int client_window)
+{
+ if (channel->rcc) {
+ red_channel_client_ack_set_client_window(channel->rcc, client_window);
+ }
+}
+
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+ red_printf("%p (channel %p)", rcc, rcc->channel);
+
+ if (rcc->send_data.item) {
+ rcc->channel->release_item(rcc, rcc->send_data.item, FALSE);
+ }
+ // TODO: clear our references from the pipe
+ reds_stream_free(rcc->stream);
+ rcc->send_data.item = NULL;
+ rcc->send_data.blocked = FALSE;
+ rcc->send_data.size = 0;
+ rcc->channel->rcc = NULL;
+}
+
+void red_channel_disconnect(RedChannel *channel)
+{
+ red_channel_pipe_clear(channel);
+ if (channel->rcc) {
+ red_channel_client_disconnect(channel->rcc);
+ }
+}
+
+int red_channel_all_clients_serials_are_zero(RedChannel *channel)
+{
+ return (!channel->rcc || channel->rcc->send_data.serial == 0);
+}
+
+void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v)
+{
+ if (channel->rcc) {
+ v(channel->rcc);
+ }
+}
+
+void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data)
+{
+ if (channel->rcc) {
+ v(channel->rcc, data);
+ }
+}
+
+void red_channel_set_shut(RedChannel *channel)
+{
+ if (channel->rcc) {
+ channel->rcc->incoming.shut = TRUE;
+ }
}
int red_channel_all_blocked(RedChannel *channel)
{
- return channel->send_data.blocked;
+ return !channel || !channel->rcc || channel->rcc->send_data.blocked;
}
int red_channel_any_blocked(RedChannel *channel)
{
- return channel->send_data.blocked;
+ return !channel || !channel->rcc || channel->rcc->send_data.blocked;
}
-int red_channel_send_message_pending(RedChannel *channel)
+int red_channel_client_blocked(RedChannelClient *rcc)
{
- return channel->send_data.header->type != 0;
+ return rcc && rcc->send_data.blocked;
}
-/* accessors for RedChannel */
-SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel)
+int red_channel_client_send_message_pending(RedChannelClient *rcc)
{
- return channel->send_data.marshaller;
+ return rcc->send_data.header->type != 0;
}
-RedsStream *red_channel_get_stream(RedChannel *channel)
+/* accessors for RedChannelClient */
+SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
{
- return channel->stream;
+ return rcc->send_data.marshaller;
}
-SpiceDataHeader *red_channel_get_header(RedChannel *channel)
+RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
{
- return channel->send_data.header;
+ return rcc->stream;
+}
+
+SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc)
+{
+ return rcc->send_data.header;
}
/* end of accessors */
int red_channel_get_first_socket(RedChannel *channel)
{
- if (!channel->stream) {
+ if (!channel->rcc || !channel->rcc->stream) {
return -1;
}
- return channel->stream->socket;
+ return channel->rcc->stream->socket;
+}
+
+int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item)
+{
+ return rcc->send_data.item == item;
}
int red_channel_item_being_sent(RedChannel *channel, PipeItem *item)
{
- return channel->send_data.item == item;
+ return channel->rcc && red_channel_client_item_being_sent(channel->rcc, item);
}
int red_channel_no_item_being_sent(RedChannel *channel)
{
- return channel->send_data.item == NULL;
+ return !channel->rcc || red_channel_client_no_item_being_sent(channel->rcc);
}
-void red_channel_disconnect(RedChannel *channel)
+int red_channel_client_no_item_being_sent(RedChannelClient *rcc)
{
- red_channel_pipe_clear(channel);
- reds_stream_free(channel->stream);
- channel->stream = NULL;
- channel->send_data.blocked = FALSE;
- channel->send_data.size = 0;
+ return !rcc || (rcc->send_data.size == 0);
+}
+
+static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item)
+{
+ rcc->channel->pipe_size--;
+ ring_remove(&item->link);
+}
+
+void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
+ PipeItem *item)
+{
+ red_channel_client_pipe_remove(rcc, item);
+ red_channel_client_release_item(rcc, item, FALSE);
}