summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/red_channel.c65
-rw-r--r--server/red_channel.h34
2 files changed, 57 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 0a36698..783fa48 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -84,7 +84,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
((uint8_t *)&handler->header) + handler->header_pos,
sizeof(SpiceDataHeader) - handler->header_pos);
if (bytes_read == -1) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->header_pos += bytes_read;
@@ -96,10 +96,10 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
if (handler->msg_pos < handler->header.size) {
if (!handler->msg) {
- handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header);
+ handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header);
if (handler->msg == NULL) {
red_printf("ERROR: channel refused to allocate buffer.");
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
}
@@ -108,8 +108,8 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
handler->msg + handler->msg_pos,
handler->header.size - handler->msg_pos);
if (bytes_read == -1) {
- handler->release_msg_buf(handler->opaque, &handler->header, handler->msg);
- handler->on_error(handler->opaque);
+ handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos += bytes_read;
@@ -118,24 +118,24 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
}
}
- if (handler->parser) {
- parsed = handler->parser(handler->msg,
+ if (handler->cb->parser) {
+ parsed = handler->cb->parser(handler->msg,
handler->msg + handler->header.size, handler->header.type,
SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
if (parsed == NULL) {
red_printf("failed to parse message type %d", handler->header.type);
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
- ret_handle = handler->handle_parsed(handler->opaque, parsed_size,
+ ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
handler->header.type, parsed);
parsed_free(parsed);
} else {
- ret_handle = handler->handle_message(handler->opaque, &handler->header,
+ ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
if (handler->shut) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos = 0;
@@ -143,7 +143,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
handler->header_pos = 0;
if (!ret_handle) {
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
}
@@ -160,35 +160,35 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
if (handler->size == 0) {
handler->vec = handler->vec_buf;
- handler->size = handler->get_msg_size(handler->opaque);
+ handler->size = handler->cb->get_msg_size(handler->opaque);
if (!handler->size) { // nothing to be sent
return;
}
}
for (;;) {
- handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+ handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
n = reds_stream_writev(stream, handler->vec, handler->vec_size);
if (n == -1) {
switch (errno) {
case EAGAIN:
- handler->on_block(handler->opaque);
+ handler->cb->on_block(handler->opaque);
return;
case EINTR:
continue;
case EPIPE:
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
default:
red_printf("%s", strerror(errno));
- handler->on_error(handler->opaque);
+ handler->cb->on_error(handler->opaque);
return;
}
} else {
handler->pos += n;
stat_inc_counter(handler->out_bytes_counter, n);
if (handler->pos == handler->size) { // finished writing data
- handler->on_msg_done(handler->opaque);
+ handler->cb->on_msg_done(handler->opaque);
handler->vec = handler->vec_buf;
handler->pos = 0;
handler->size = 0;
@@ -351,21 +351,24 @@ RedChannel *red_channel_create(int size, RedsStream *stream,
channel->send_data.marshaller = spice_marshaller_new();
channel->incoming.opaque = channel;
- channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
- channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
- channel->incoming.handle_message = (handle_message_proc)handle_message;
- channel->incoming.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
+ channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
+ channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
+ channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
+ channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
channel->outgoing.opaque = channel;
channel->outgoing.pos = 0;
channel->outgoing.size = 0;
channel->outgoing.out_bytes_counter = NULL;
- channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
- channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
- channel->outgoing.on_block = red_channel_peer_on_out_block;
- channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
- channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
+ channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
+ channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
+ channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
+ channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
+ channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
+
+ channel->incoming.cb = &channel->incoming_cb;
+ channel->outgoing.cb = &channel->outgoing_cb;
channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
@@ -422,12 +425,12 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream,
if (channel == NULL) {
return NULL;
}
- channel->incoming.handle_parsed = (handle_parsed_proc)handle_parsed;
- channel->incoming.parser = parser;
+ channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
+ channel->incoming_cb.parser = parser;
channel->on_incoming_error = incoming_error;
channel->on_outgoing_error = outgoing_error;
- channel->incoming.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
- channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
+ channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
+ channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
return channel;
}
diff --git a/server/red_channel.h b/server/red_channel.h
index 15c3b3c..7ec0734 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -44,12 +44,7 @@ typedef void (*release_msg_recv_buf_proc)(void *opaque,
SpiceDataHeader *msg_header, uint8_t *msg);
typedef void (*on_incoming_error_proc)(void *opaque);
-typedef struct IncomingHandler {
- void *opaque;
- SpiceDataHeader header;
- uint32_t header_pos;
- uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
- uint32_t msg_pos;
+typedef struct IncomingHandlerInterface {
handle_message_proc handle_message;
alloc_msg_recv_buf_proc alloc_msg_buf;
on_incoming_error_proc on_error; // recv error or handle_message error
@@ -57,6 +52,15 @@ typedef struct IncomingHandler {
// The following is an optional alternative to handle_message, used if not null
spice_parse_channel_func_t parser;
handle_parsed_proc handle_parsed;
+} IncomingHandlerInterface;
+
+typedef struct IncomingHandler {
+ IncomingHandlerInterface *cb;
+ void *opaque;
+ SpiceDataHeader header;
+ uint32_t header_pos;
+ uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
+ uint32_t msg_pos;
int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX
} IncomingHandler;
@@ -65,18 +69,23 @@ typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_
typedef void (*on_outgoing_error_proc)(void *opaque);
typedef void (*on_outgoing_block_proc)(void *opaque);
typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+
+typedef struct OutgoingHandlerInterface {
+ get_outgoing_msg_size_proc get_msg_size;
+ prepare_outgoing_proc prepare;
+ on_outgoing_error_proc on_error;
+ on_outgoing_block_proc on_block;
+ on_outgoing_msg_done_proc on_msg_done;
+} OutgoingHandlerInterface;
+
typedef struct OutgoingHandler {
+ OutgoingHandlerInterface *cb;
void *opaque;
struct iovec vec_buf[MAX_SEND_VEC];
int vec_size;
struct iovec *vec;
int pos;
int size;
- get_outgoing_msg_size_proc get_msg_size;
- prepare_outgoing_proc prepare;
- on_outgoing_error_proc on_error;
- on_outgoing_block_proc on_block;
- on_outgoing_msg_done_proc on_msg_done;
#ifdef RED_STATISTICS
uint64_t *out_bytes_counter;
#endif
@@ -157,6 +166,9 @@ struct RedChannel {
OutgoingHandler outgoing;
IncomingHandler incoming;
+ OutgoingHandlerInterface outgoing_cb;
+ IncomingHandlerInterface incoming_cb;
+
channel_disconnect_proc disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;