/*
Copyright (C) 2009 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see .
*/
#ifndef _H_REDCHANNEL
#define _H_REDCHANNEL
#include "common.h"
#include "utils.h"
#include "threads.h"
#include "red_peer.h"
#include "platform.h"
#include "process_loop.h"
#include "demarshallers.h"
#include "marshallers.h"
enum {
PASSIVE_STATE,
DISCONNECTED_STATE,
CONNECTING_STATE,
CONNECTED_STATE,
TERMINATED_STATE,
};
enum {
WAIT_ACTION,
CONNECT_ACTION,
DISCONNECT_ACTION,
QUIT_ACTION,
};
class RedClient;
class RedChannel;
typedef std::vector ChannelCaps;
class RedChannelBase: public RedPeer {
public:
RedChannelBase(uint8_t type, uint8_t id, const ChannelCaps& common_caps,
const ChannelCaps& caps);
virtual ~RedChannelBase();
uint8_t get_type() { return _type;}
uint8_t get_id() { return _id;}
void connect(const ConnectionOptions& options, uint32_t connection_id, const char *host,
std::string password);
const ChannelCaps& get_common_caps() { return _common_caps;}
const ChannelCaps& get_caps() {return _caps;}
uint32_t get_peer_major() { return _remote_major;}
uint32_t get_peer_minor() { return _remote_minor;}
protected:
void set_common_capability(uint32_t cap);
void set_capability(uint32_t cap);
bool test_common_capability(uint32_t cap);
bool test_capability(uint32_t cap);
private:
void set_capability(ChannelCaps& caps, uint32_t cap);
bool test_capability(const ChannelCaps& caps, uint32_t cap);
void link(uint32_t connection_id, const std::string& password, int protocol);
private:
uint8_t _type;
uint8_t _id;
ChannelCaps _common_caps;
ChannelCaps _caps;
ChannelCaps _remote_common_caps;
ChannelCaps _remote_caps;
uint32_t _remote_major;
uint32_t _remote_minor;
};
class SendTrigger: public EventSources::Trigger {
public:
SendTrigger(RedChannel& channel);
virtual void on_event();
private:
RedChannel& _channel;
};
class AbortTrigger: public EventSources::Trigger {
public:
virtual void on_event();
};
class MigrationDisconnectSrcEvent: public Event {
public:
virtual void response(AbstractProcessLoop& events_loop);
};
class MigrationConnectTargetEvent: public Event {
public:
virtual void response(AbstractProcessLoop& events_loop);
};
struct SyncInfo {
Mutex* lock;
Condition* condition;
uint64_t* message_serial;
};
class RedChannel: public RedChannelBase {
public:
class MessageHandler;
class OutMessage;
RedChannel(RedClient& client, uint8_t type, uint8_t id, MessageHandler* handler,
Platform::ThreadPriority worker_priority = Platform::PRIORITY_NORMAL);
virtual ~RedChannel();
void start();
virtual void connect();
virtual void disconnect();
virtual bool abort();
virtual void disconnect_migration_src();
virtual void connect_migration_target();
virtual CompoundInMessage *receive();
virtual void post_message(RedChannel::OutMessage* message);
int get_connection_error() { return _error;}
Platform::ThreadPriority get_worker_priority() { return _worker_priority;}
protected:
RedClient& get_client() { return _client;}
ProcessLoop& get_process_loop() { return _loop;}
MessageHandler* get_message_handler() { return _message_handler.get();}
virtual void on_connecting() {}
virtual void on_connect() {}
virtual void on_disconnect() {}
virtual void on_migrate() {}
virtual void on_disconnect_mig_src() { on_disconnect();}
virtual void on_connect_mig_target() { on_connect();}
void handle_migrate(RedPeer::InMessage* message);
void handle_set_ack(RedPeer::InMessage* message);
void handle_ping(RedPeer::InMessage* message);
void handle_wait_for_channels(RedPeer::InMessage* message);
void handle_disconnect(RedPeer::InMessage* message);
void handle_notify(RedPeer::InMessage* message);
SpiceMessageMarshallers *_marshallers;
private:
void set_state(int state);
void run();
void send_migrate_flush_mark();
void send_messages();
void receive_messages();
void on_send_trigger();
virtual void on_event();
void on_message_received();
void on_message_complition(uint64_t serial);
void do_migration_disconnect_src();
void do_migration_connect_target();
static void* worker_main(void *);
RedChannel::OutMessage* get_outgoing_message();
void clear_outgoing_messages();
private:
RedClient& _client;
int _state;
int _action;
int _error;
bool _wait_for_threads;
bool _socket_in_loop;
Thread* _worker;
Platform::ThreadPriority _worker_priority;
std::auto_ptr _message_handler;
Mutex _state_lock;
Condition _state_cond;
Mutex _action_lock;
Condition _action_cond;
SyncInfo _sync_info;
Mutex _outgoing_lock;
std::list _outgoing_messages;
RedChannel::OutMessage* _outgoing_message;
uint32_t _outgoing_pos;
SpiceDataHeader _incomming_header;
uint32_t _incomming_header_pos;
RedPeer::CompoundInMessage* _incomming_message;
uint32_t _incomming_message_pos;
uint32_t _message_ack_count;
uint32_t _message_ack_window;
ProcessLoop _loop;
SendTrigger _send_trigger;
AbortTrigger _abort_trigger;
uint64_t _disconnect_stamp;
uint64_t _disconnect_reason;
friend class SendTrigger;
friend class MigrationDisconnectSrcEvent;
friend class MigrationConnectTargetEvent;
};
class RedChannel::OutMessage {
public:
OutMessage() {}
virtual ~OutMessage() {}
virtual RedPeer::OutMessage& peer_message() = 0;
virtual void release() = 0;
};
class Message: public RedChannel::OutMessage, public RedPeer::OutMessage {
public:
Message(uint32_t type)
: RedChannel::OutMessage()
, RedPeer::OutMessage(type)
{
}
virtual RedPeer::OutMessage& peer_message() { return *this;}
virtual void release() {delete this;}
};
class RedChannel::MessageHandler {
public:
MessageHandler() {}
virtual ~MessageHandler() {}
virtual void handle_message(RedPeer::CompoundInMessage& message) = 0;
};
template
class MessageHandlerImp: public RedChannel::MessageHandler {
public:
MessageHandlerImp(HandlerClass& obj);
~MessageHandlerImp() { delete [] _handlers; };
virtual void handle_message(RedPeer::CompoundInMessage& message);
typedef void (HandlerClass::*Handler)(RedPeer::InMessage* message);
void set_handler(unsigned int id, Handler handler);
private:
HandlerClass& _obj;
unsigned int _max_messages;
spice_parse_channel_func_t _parser;
Handler *_handlers;
};
template
MessageHandlerImp::MessageHandlerImp(HandlerClass& obj)
: _obj (obj)
, _parser (NULL)
{
/* max_messages is always from current as its larger than for backwards compat */
spice_get_server_channel_parser(channel_id, &_max_messages);
_handlers = new Handler[_max_messages + 1];
memset(_handlers, 0, sizeof(Handler) * (_max_messages + 1));
}
template
void MessageHandlerImp::handle_message(RedPeer::CompoundInMessage&
message)
{
uint8_t *msg;
uint8_t *parsed;
uint16_t type;
uint32_t size;
size_t parsed_size;
message_destructor_t parsed_free;
if (_parser == NULL) {
/* We need to do this lazily rather than at constuction because we
don't know the major until we've connected */
if (_obj.get_peer_major() == 1) {
_parser = spice_get_server_channel_parser1(channel_id, NULL);
} else {
_parser = spice_get_server_channel_parser(channel_id, NULL);
}
}
if (message.sub_list()) {
SpiceSubMessageList *sub_list;
sub_list = (SpiceSubMessageList *)(message.data() + message.sub_list());
for (int i = 0; i < sub_list->size; i++) {
SpiceSubMessage *sub = (SpiceSubMessage *)(message.data() + sub_list->sub_messages[i]);
msg = (uint8_t *)(sub + 1);
type = sub->type;
size = sub->size;
parsed = _parser(msg, msg + size, type, _obj.get_peer_minor(), &parsed_size, &parsed_free);
if (parsed == NULL) {
THROW("failed to parse message type %d", type);
}
RedPeer::InMessage sub_message(type, parsed_size, parsed);
(_obj.*_handlers[type])(&sub_message);
parsed_free(parsed);
}
}
msg = message.data();
type = message.type();
size = message.size();
parsed = _parser(msg, msg + size, type, _obj.get_peer_minor(), &parsed_size, &parsed_free);
if (parsed == NULL) {
THROW("failed to parse message channel %d type %d", channel_id, type);
}
RedPeer::InMessage main_message(type, parsed_size, parsed);
(_obj.*_handlers[type])(&main_message);
parsed_free(parsed);
}
template
void MessageHandlerImp::set_handler(unsigned int id, Handler handler)
{
if (id > _max_messages) {
THROW("bad handler id");
}
_handlers[id] = handler;
}
#endif