diff options
-rw-r--r-- | server/red_channel.c | 65 | ||||
-rw-r--r-- | server/red_channel.h | 34 |
2 files changed, 57 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 0a36698..783fa48 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -84,7 +84,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle ((uint8_t *)&handler->header) + handler->header_pos, sizeof(SpiceDataHeader) - handler->header_pos); if (bytes_read == -1) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } handler->header_pos += bytes_read; @@ -96,10 +96,10 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle if (handler->msg_pos < handler->header.size) { if (!handler->msg) { - handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header); + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header); if (handler->msg == NULL) { red_printf("ERROR: channel refused to allocate buffer."); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } @@ -108,8 +108,8 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle handler->msg + handler->msg_pos, handler->header.size - handler->msg_pos); if (bytes_read == -1) { - handler->release_msg_buf(handler->opaque, &handler->header, handler->msg); - handler->on_error(handler->opaque); + handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->cb->on_error(handler->opaque); return; } handler->msg_pos += bytes_read; @@ -118,24 +118,24 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle } } - if (handler->parser) { - parsed = handler->parser(handler->msg, + if (handler->cb->parser) { + parsed = handler->cb->parser(handler->msg, handler->msg + handler->header.size, handler->header.type, SPICE_VERSION_MINOR, &parsed_size, &parsed_free); if (parsed == NULL) { red_printf("failed to parse message type %d", handler->header.type); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } - ret_handle = handler->handle_parsed(handler->opaque, parsed_size, + ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, handler->header.type, parsed); parsed_free(parsed); } else { - ret_handle = handler->handle_message(handler->opaque, &handler->header, + ret_handle = handler->cb->handle_message(handler->opaque, &handler->header, handler->msg); } if (handler->shut) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } handler->msg_pos = 0; @@ -143,7 +143,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle handler->header_pos = 0; if (!ret_handle) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } @@ -160,35 +160,35 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle if (handler->size == 0) { handler->vec = handler->vec_buf; - handler->size = handler->get_msg_size(handler->opaque); + handler->size = handler->cb->get_msg_size(handler->opaque); if (!handler->size) { // nothing to be sent return; } } for (;;) { - handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); + handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); n = reds_stream_writev(stream, handler->vec, handler->vec_size); if (n == -1) { switch (errno) { case EAGAIN: - handler->on_block(handler->opaque); + handler->cb->on_block(handler->opaque); return; case EINTR: continue; case EPIPE: - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; default: red_printf("%s", strerror(errno)); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } else { handler->pos += n; stat_inc_counter(handler->out_bytes_counter, n); if (handler->pos == handler->size) { // finished writing data - handler->on_msg_done(handler->opaque); + handler->cb->on_msg_done(handler->opaque); handler->vec = handler->vec_buf; handler->pos = 0; handler->size = 0; @@ -351,21 +351,24 @@ RedChannel *red_channel_create(int size, RedsStream *stream, channel->send_data.marshaller = spice_marshaller_new(); channel->incoming.opaque = channel; - channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; - channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; - channel->incoming.handle_message = (handle_message_proc)handle_message; - channel->incoming.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error; + channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; + channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; + channel->incoming_cb.handle_message = (handle_message_proc)handle_message; + channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error; channel->outgoing.opaque = channel; channel->outgoing.pos = 0; channel->outgoing.size = 0; channel->outgoing.out_bytes_counter = NULL; - channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; - channel->outgoing.prepare = red_channel_peer_prepare_out_msg; - channel->outgoing.on_block = red_channel_peer_on_out_block; - channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; - channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing_cb.on_block = red_channel_peer_on_out_block; + channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; + channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done; + + channel->incoming.cb = &channel->incoming_cb; + channel->outgoing.cb = &channel->outgoing_cb; channel->shut = 0; // came here from inputs, perhaps can be removed? XXX @@ -422,12 +425,12 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream, if (channel == NULL) { return NULL; } - channel->incoming.handle_parsed = (handle_parsed_proc)handle_parsed; - channel->incoming.parser = parser; + channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; + channel->incoming_cb.parser = parser; channel->on_incoming_error = incoming_error; channel->on_outgoing_error = outgoing_error; - channel->incoming.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; - channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; + channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; + channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; return channel; } diff --git a/server/red_channel.h b/server/red_channel.h index 15c3b3c..7ec0734 100644 --- a/server/red_channel.h +++ b/server/red_channel.h @@ -44,12 +44,7 @@ typedef void (*release_msg_recv_buf_proc)(void *opaque, SpiceDataHeader *msg_header, uint8_t *msg); typedef void (*on_incoming_error_proc)(void *opaque); -typedef struct IncomingHandler { - void *opaque; - SpiceDataHeader header; - uint32_t header_pos; - uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. - uint32_t msg_pos; +typedef struct IncomingHandlerInterface { handle_message_proc handle_message; alloc_msg_recv_buf_proc alloc_msg_buf; on_incoming_error_proc on_error; // recv error or handle_message error @@ -57,6 +52,15 @@ typedef struct IncomingHandler { // The following is an optional alternative to handle_message, used if not null spice_parse_channel_func_t parser; handle_parsed_proc handle_parsed; +} IncomingHandlerInterface; + +typedef struct IncomingHandler { + IncomingHandlerInterface *cb; + void *opaque; + SpiceDataHeader header; + uint32_t header_pos; + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. + uint32_t msg_pos; int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX } IncomingHandler; @@ -65,18 +69,23 @@ typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_ typedef void (*on_outgoing_error_proc)(void *opaque); typedef void (*on_outgoing_block_proc)(void *opaque); typedef void (*on_outgoing_msg_done_proc)(void *opaque); + +typedef struct OutgoingHandlerInterface { + get_outgoing_msg_size_proc get_msg_size; + prepare_outgoing_proc prepare; + on_outgoing_error_proc on_error; + on_outgoing_block_proc on_block; + on_outgoing_msg_done_proc on_msg_done; +} OutgoingHandlerInterface; + typedef struct OutgoingHandler { + OutgoingHandlerInterface *cb; void *opaque; struct iovec vec_buf[MAX_SEND_VEC]; int vec_size; struct iovec *vec; int pos; int size; - get_outgoing_msg_size_proc get_msg_size; - prepare_outgoing_proc prepare; - on_outgoing_error_proc on_error; - on_outgoing_block_proc on_block; - on_outgoing_msg_done_proc on_msg_done; #ifdef RED_STATISTICS uint64_t *out_bytes_counter; #endif @@ -157,6 +166,9 @@ struct RedChannel { OutgoingHandler outgoing; IncomingHandler incoming; + OutgoingHandlerInterface outgoing_cb; + IncomingHandlerInterface incoming_cb; + channel_disconnect_proc disconnect; channel_send_pipe_item_proc send_item; channel_hold_pipe_item_proc hold_item; |