summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2010-11-12 12:54:14 +0200
committerAlon Levy <alevy@redhat.com>2011-03-02 17:27:53 +0200
commit692b41f946a598bb459419bb94f4fb55474dd55e (patch)
tree0763bfa86137a460bb422915239080386733b3a4
parentd1feaeb2820c7566c6d82a2e7dbb62d237eeb440 (diff)
server/red_channel: split Incoming/Outgoing to callback and state
This allows later to have the callback table under RedChannel when the callbacks actually get used by RedChannelClient. Since the cb's are identical for different clients of the same channel it makes sense to store the callback pointers in one place per channel. The rest of the incoming and outgoing struct just gets moved to RedChannelClient.
-rw-r--r--server/red_channel.c65
-rw-r--r--server/red_channel.h34
2 files changed, 57 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 0a36698..783fa48 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -84,7 +84,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
((uint8_t *)&handler->header) + handler->header_pos,
sizeof(SpiceDataHeader) - handler->header_pos);
if (bytes_read == -1) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->header_pos += bytes_read;
@@ -96,10 +96,10 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
if (handler->msg_pos < handler->header.size) {
if (!handler->msg) {
- handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header);
+ handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header);
if (handler->msg == NULL) {
red_printf("ERROR: channel refused to allocate buffer.");
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
}
@@ -108,8 +108,8 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
handler->msg + handler->msg_pos,
handler->header.size - handler->msg_pos);
if (bytes_read == -1) {
- handler->release_msg_buf(handler->opaque, &handler->header, handler->msg);
- handler->on_error(handler->opaque);
+ handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos += bytes_read;
@@ -118,24 +118,24 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
}
}
- if (handler->parser) {
- parsed = handler->parser(handler->msg,
+ if (handler->cb->parser) {
+ parsed = handler->cb->parser(handler->msg,
handler->msg + handler->header.size, handler->header.type,
SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
if (parsed == NULL) {
red_printf("failed to parse message type %d", handler->header.type);
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
- ret_handle = handler->handle_parsed(handler->opaque, parsed_size,
+ ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
handler->header.type, parsed);
parsed_free(parsed);
} else {
- ret_handle = handler->handle_message(handler->opaque, &handler->header,
+ ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
if (handler->shut) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos = 0;
@@ -143,7 +143,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
handler->header_pos = 0;
if (!ret_handle) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
}
@@ -160,35 +160,35 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
if (handler->size == 0) {
handler->vec = handler->vec_buf;
- handler->size = handler->get_msg_size(handler->opaque);
+ handler->size = handler->cb->get_msg_size(handler->opaque);
if (!handler->size) { // nothing to be sent
return;
}
}
for (;;) {
- handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+ handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
n = reds_stream_writev(stream, handler->vec, handler->vec_size);
if (n == -1) {
switch (errno) {
case EAGAIN:
- handler->on_block(handler->opaque);
+ handler->cb->on_block(handler->opaque);
return;
case EINTR:
continue;
case EPIPE:
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
default:
red_printf("%s", strerror(errno));
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
} else {
handler->pos += n;
stat_inc_counter(handler->out_bytes_counter, n);
if (handler->pos == handler->size) { // finished writing data
- handler->on_msg_done(handler->opaque);
+ handler->cb->on_msg_done(handler->opaque);
handler->vec = handler->vec_buf;
handler->pos = 0;
handler->size = 0;
@@ -351,21 +351,24 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
channel->send_data.marshaller = spice_marshaller_new();
channel->incoming.opaque = channel;
- channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
- channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
- channel->incoming.handle_message = (handle_message_proc)handle_message;
- channel->incoming.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
+ channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
+ channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
+ channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
+ channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
channel->outgoing.opaque = channel;
channel->outgoing.pos = 0;
channel->outgoing.size = 0;
channel->outgoing.out_bytes_counter = NULL;
- channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
- channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
- channel->outgoing.on_block = red_channel_peer_on_out_block;
- channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
- channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
+ channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
+ channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
+ channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
+ channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
+ channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
+
+ channel->incoming.cb = &channel->incoming_cb;
+ channel->outgoing.cb = &channel->outgoing_cb;
channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
@@ -422,12 +425,12 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
if (channel == NULL) {
return NULL;
}
- channel->incoming.handle_parsed = (handle_parsed_proc)handle_parsed;
- channel->incoming.parser = parser;
+ channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
+ channel->incoming_cb.parser = parser;
channel->on_incoming_error = incoming_error;
channel->on_outgoing_error = outgoing_error;
- channel->incoming.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
- channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
+ channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
+ channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
return channel;
}
diff --git a/server/red_channel.h b/server/red_channel.h
index 15c3b3c..7ec0734 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -44,12 +44,7 @@ typedef void (*release_msg_recv_buf_proc)(void *opaque,
SpiceDataHeader *msg_header, uint8_t *msg);
typedef void (*on_incoming_error_proc)(void *opaque);
-typedef struct IncomingHandler {
- void *opaque;
- SpiceDataHeader header;
- uint32_t header_pos;
- uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
- uint32_t msg_pos;
+typedef struct IncomingHandlerInterface {
handle_message_proc handle_message;
alloc_msg_recv_buf_proc alloc_msg_buf;
on_incoming_error_proc on_error; // recv error or handle_message error
@@ -57,6 +52,15 @@ typedef struct IncomingHandler {
// The following is an optional alternative to handle_message, used if not null
spice_parse_channel_func_t parser;
handle_parsed_proc handle_parsed;
+} IncomingHandlerInterface;
+
+typedef struct IncomingHandler {
+ IncomingHandlerInterface *cb;
+ void *opaque;
+ SpiceDataHeader header;
+ uint32_t header_pos;
+ uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
+ uint32_t msg_pos;
int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX
} IncomingHandler;
@@ -65,18 +69,23 @@ typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_
typedef void (*on_outgoing_error_proc)(void *opaque);
typedef void (*on_outgoing_block_proc)(void *opaque);
typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+
+typedef struct OutgoingHandlerInterface {
+ get_outgoing_msg_size_proc get_msg_size;
+ prepare_outgoing_proc prepare;
+ on_outgoing_error_proc on_error;
+ on_outgoing_block_proc on_block;
+ on_outgoing_msg_done_proc on_msg_done;
+} OutgoingHandlerInterface;
+
typedef struct OutgoingHandler {
+ OutgoingHandlerInterface *cb;
void *opaque;
struct iovec vec_buf[MAX_SEND_VEC];
int vec_size;
struct iovec *vec;
int pos;
int size;
- get_outgoing_msg_size_proc get_msg_size;
- prepare_outgoing_proc prepare;
- on_outgoing_error_proc on_error;
- on_outgoing_block_proc on_block;
- on_outgoing_msg_done_proc on_msg_done;
#ifdef RED_STATISTICS
uint64_t *out_bytes_counter;
#endif
@@ -157,6 +166,9 @@ struct RedChannel {
OutgoingHandler outgoing;
IncomingHandler incoming;
+ OutgoingHandlerInterface outgoing_cb;
+ IncomingHandlerInterface incoming_cb;
+
channel_disconnect_proc disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;