diff options
author | Alon Levy <alevy@redhat.com> | 2010-11-12 12:54:14 +0200 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-03-02 17:27:53 +0200 |
commit | 692b41f946a598bb459419bb94f4fb55474dd55e (patch) | |
tree | 0763bfa86137a460bb422915239080386733b3a4 | |
parent | d1feaeb2820c7566c6d82a2e7dbb62d237eeb440 (diff) |
server/red_channel: split Incoming/Outgoing to callback and state
This allows later to have the callback table under RedChannel when
the callbacks actually get used by RedChannelClient. Since the cb's
are identical for different clients of the same channel it makes sense
to store the callback pointers in one place per channel. The rest of
the incoming and outgoing struct just gets moved to RedChannelClient.
-rw-r--r-- | server/red_channel.c | 65 | ||||
-rw-r--r-- | server/red_channel.h | 34 |
2 files changed, 57 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 0a36698..783fa48 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -84,7 +84,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle ((uint8_t *)&handler->header) + handler->header_pos, sizeof(SpiceDataHeader) - handler->header_pos); if (bytes_read == -1) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } handler->header_pos += bytes_read; @@ -96,10 +96,10 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle if (handler->msg_pos < handler->header.size) { if (!handler->msg) { - handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header); + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header); if (handler->msg == NULL) { red_printf("ERROR: channel refused to allocate buffer."); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } @@ -108,8 +108,8 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle handler->msg + handler->msg_pos, handler->header.size - handler->msg_pos); if (bytes_read == -1) { - handler->release_msg_buf(handler->opaque, &handler->header, handler->msg); - handler->on_error(handler->opaque); + handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->cb->on_error(handler->opaque); return; } handler->msg_pos += bytes_read; @@ -118,24 +118,24 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle } } - if (handler->parser) { - parsed = handler->parser(handler->msg, + if (handler->cb->parser) { + parsed = handler->cb->parser(handler->msg, handler->msg + handler->header.size, handler->header.type, SPICE_VERSION_MINOR, &parsed_size, &parsed_free); if (parsed == NULL) { red_printf("failed to parse message type %d", handler->header.type); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } - ret_handle = handler->handle_parsed(handler->opaque, parsed_size, + ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, handler->header.type, parsed); parsed_free(parsed); } else { - ret_handle = handler->handle_message(handler->opaque, &handler->header, + ret_handle = handler->cb->handle_message(handler->opaque, &handler->header, handler->msg); } if (handler->shut) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } handler->msg_pos = 0; @@ -143,7 +143,7 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle handler->header_pos = 0; if (!ret_handle) { - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } @@ -160,35 +160,35 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle if (handler->size == 0) { handler->vec = handler->vec_buf; - handler->size = handler->get_msg_size(handler->opaque); + handler->size = handler->cb->get_msg_size(handler->opaque); if (!handler->size) { // nothing to be sent return; } } for (;;) { - handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); + handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); n = reds_stream_writev(stream, handler->vec, handler->vec_size); if (n == -1) { switch (errno) { case EAGAIN: - handler->on_block(handler->opaque); + handler->cb->on_block(handler->opaque); return; case EINTR: continue; case EPIPE: - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; default: red_printf("%s", strerror(errno)); - handler->on_error(handler->opaque); + handler->cb->on_error(handler->opaque); return; } } else { handler->pos += n; stat_inc_counter(handler->out_bytes_counter, n); if (handler->pos == handler->size) { // finished writing data - handler->on_msg_done(handler->opaque); + handler->cb->on_msg_done(handler->opaque); handler->vec = handler->vec_buf; handler->pos = 0; handler->size = 0; @@ -351,21 +351,24 @@ RedChannel *red_channel_create(int size, RedsStream *stream, channel->send_data.marshaller = spice_marshaller_new(); channel->incoming.opaque = channel; - channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; - channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; - channel->incoming.handle_message = (handle_message_proc)handle_message; - channel->incoming.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error; + channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; + channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; + channel->incoming_cb.handle_message = (handle_message_proc)handle_message; + channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error; channel->outgoing.opaque = channel; channel->outgoing.pos = 0; channel->outgoing.size = 0; channel->outgoing.out_bytes_counter = NULL; - channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; - channel->outgoing.prepare = red_channel_peer_prepare_out_msg; - channel->outgoing.on_block = red_channel_peer_on_out_block; - channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; - channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing_cb.on_block = red_channel_peer_on_out_block; + channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; + channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done; + + channel->incoming.cb = &channel->incoming_cb; + channel->outgoing.cb = &channel->outgoing_cb; channel->shut = 0; // came here from inputs, perhaps can be removed? XXX @@ -422,12 +425,12 @@ RedChannel *red_channel_create_parser(int size, RedsStream *stream, if (channel == NULL) { return NULL; } - channel->incoming.handle_parsed = (handle_parsed_proc)handle_parsed; - channel->incoming.parser = parser; + channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; + channel->incoming_cb.parser = parser; channel->on_incoming_error = incoming_error; channel->on_outgoing_error = outgoing_error; - channel->incoming.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; - channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; + channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; + channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; return channel; } diff --git a/server/red_channel.h b/server/red_channel.h index 15c3b3c..7ec0734 100644 --- a/server/red_channel.h +++ b/server/red_channel.h @@ -44,12 +44,7 @@ typedef void (*release_msg_recv_buf_proc)(void *opaque, SpiceDataHeader *msg_header, uint8_t *msg); typedef void (*on_incoming_error_proc)(void *opaque); -typedef struct IncomingHandler { - void *opaque; - SpiceDataHeader header; - uint32_t header_pos; - uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. - uint32_t msg_pos; +typedef struct IncomingHandlerInterface { handle_message_proc handle_message; alloc_msg_recv_buf_proc alloc_msg_buf; on_incoming_error_proc on_error; // recv error or handle_message error @@ -57,6 +52,15 @@ typedef struct IncomingHandler { // The following is an optional alternative to handle_message, used if not null spice_parse_channel_func_t parser; handle_parsed_proc handle_parsed; +} IncomingHandlerInterface; + +typedef struct IncomingHandler { + IncomingHandlerInterface *cb; + void *opaque; + SpiceDataHeader header; + uint32_t header_pos; + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. + uint32_t msg_pos; int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX } IncomingHandler; @@ -65,18 +69,23 @@ typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_ typedef void (*on_outgoing_error_proc)(void *opaque); typedef void (*on_outgoing_block_proc)(void *opaque); typedef void (*on_outgoing_msg_done_proc)(void *opaque); + +typedef struct OutgoingHandlerInterface { + get_outgoing_msg_size_proc get_msg_size; + prepare_outgoing_proc prepare; + on_outgoing_error_proc on_error; + on_outgoing_block_proc on_block; + on_outgoing_msg_done_proc on_msg_done; +} OutgoingHandlerInterface; + typedef struct OutgoingHandler { + OutgoingHandlerInterface *cb; void *opaque; struct iovec vec_buf[MAX_SEND_VEC]; int vec_size; struct iovec *vec; int pos; int size; - get_outgoing_msg_size_proc get_msg_size; - prepare_outgoing_proc prepare; - on_outgoing_error_proc on_error; - on_outgoing_block_proc on_block; - on_outgoing_msg_done_proc on_msg_done; #ifdef RED_STATISTICS uint64_t *out_bytes_counter; #endif @@ -157,6 +166,9 @@ struct RedChannel { OutgoingHandler outgoing; IncomingHandler incoming; + OutgoingHandlerInterface outgoing_cb; + IncomingHandlerInterface incoming_cb; + channel_disconnect_proc disconnect; channel_send_pipe_item_proc send_item; channel_hold_pipe_item_proc hold_item; |