diff options
Diffstat (limited to 'client/red_client.cpp')
-rw-r--r-- | client/red_client.cpp | 811 |
1 files changed, 811 insertions, 0 deletions
diff --git a/client/red_client.cpp b/client/red_client.cpp new file mode 100644 index 00000000..941321fb --- /dev/null +++ b/client/red_client.cpp @@ -0,0 +1,811 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" +#include <math.h> +#include "red_client.h" +#include "application.h" +#include "utils.h" +#include "debug.h" + +Migrate::Migrate(RedClient& client) + : _client (client) + , _running (false) + , _aborting (false) + , _connected (false) + , _thread (NULL) + , _pending_con (0) +{ +} + +Migrate::~Migrate() +{ + ASSERT(!_thread); + delete_channels(); +} + +void Migrate::delete_channels() +{ + while (!_channels.empty()) { + MigChannels::iterator iter = _channels.begin(); + delete *iter; + _channels.erase(iter); + } +} + +void Migrate::clear_channels() +{ + Lock lock(_lock); + ASSERT(!_running); + delete_channels(); +} + +void Migrate::add_channel(MigChannel* channel) +{ + Lock lock(_lock); + _channels.push_back(channel); +} + +void Migrate::swap_peer(RedChannelBase& other) +{ + DBG(0, "channel type %u id %u", other.get_type(), other.get_id()); + try { + Lock lock(_lock); + MigChannels::iterator iter = _channels.begin(); + + if (_running) { + THROW("swap and running"); + } + + if (!_connected) { + THROW("not connected"); + } + + for (; iter != _channels.end(); ++iter) { + MigChannel* curr = *iter; + if (curr->get_type() == other.get_type() && curr->get_id() == other.get_id()) { + if (!curr->is_valid()) { + THROW("invalid"); + } + other.swap(curr); + curr->set_valid(false); + if (!--_pending_con) { + lock.unlock(); + _client.set_target(_host.c_str(), _port, _sport); + abort(); + } + return; + } + } + THROW("no channel"); + } catch (...) { + abort(); + throw; + } +} + +void Migrate::connect_one(MigChannel& channel, const RedPeer::ConnectionOptions& options, + uint32_t connection_id) +{ + if (_aborting) { + DBG(0, "aborting"); + THROW("aborting"); + } + channel.connect(options, connection_id, _host.c_str(), _password); + ++_pending_con; + channel.set_valid(true); +} + +void Migrate::run() +{ + uint32_t connection_id; + + DBG(0, ""); + try { + RedPeer::ConnectionOptions con_opt(_client.get_connection_options(RED_CHANNEL_MAIN), + _port, _port); + MigChannels::iterator iter = _channels.begin(); + connection_id = _client.get_connection_id(); + connect_one(**iter, con_opt, connection_id); + for (++iter; iter != _channels.end(); ++iter) { + con_opt = RedPeer::ConnectionOptions( + _client.get_connection_options((*iter)->get_type()), + _port, _sport); + connect_one(**iter, con_opt, connection_id); + } + _connected = true; + DBG(0, "connected"); + } catch (...) { + close_channels(); + } + + Lock lock(_lock); + _cond.notify_one(); + if (_connected) { + Message* message = new Message(REDC_MIGRATE_CONNECTED, 0); + _client.post_message(message); + } else { + Message* message = new Message(REDC_MIGRATE_CONNECT_ERROR, 0); + _client.post_message(message); + } + _running = false; +} + +void* Migrate::worker_main(void *data) +{ + Migrate* mig = (Migrate*)data; + mig->run(); + return NULL; +} + +void Migrate::start(const RedMigrationBegin* migrate) +{ + DBG(0, ""); + abort(); + _host.assign(migrate->host); + _port = migrate->port ? migrate->port : -1; + _sport = migrate->sport ? migrate->sport : -1; + _password = _client._password; + Lock lock(_lock); + _running = true; + lock.unlock(); + _thread = new Thread(Migrate::worker_main, this); +} + +void Migrate::disconnect_channels() +{ + MigChannels::iterator iter = _channels.begin(); + + for (; iter != _channels.end(); ++iter) { + (*iter)->disconnect(); + (*iter)->set_valid(false); + } +} + +void Migrate::close_channels() +{ + MigChannels::iterator iter = _channels.begin(); + + for (; iter != _channels.end(); ++iter) { + (*iter)->close(); + (*iter)->set_valid(false); + (*iter)->enable(); + } +} + +bool Migrate::abort() +{ + Lock lock(_lock); + if (_aborting) { + return false; + } + _aborting = true; + for (;;) { + disconnect_channels(); + if (!_running) { + break; + } + uint64_t timout = 1000 * 1000 * 10; /*10ms*/ + _cond.timed_wait(lock, timout); + } + close_channels(); + _pending_con = 0; + _connected = false; + _aborting = false; + if (_thread) { + _thread->join(); + delete _thread; + _thread = NULL; + } + return true; +} + +#define AGENT_TIMEOUT (1000 * 30) + +void agent_timer_proc(void *opaque, TimerID timer) +{ + Platform::deactivate_interval_timer(timer); + THROW_ERR(SPICEC_ERROR_CODE_AGENT_TIMEOUT, "vdagent timeout"); +} + +class MainChannelLoop: public MessageHandlerImp<RedClient, RED_MESSAGES_END> { +public: + MainChannelLoop(RedClient& client): MessageHandlerImp<RedClient, RED_MESSAGES_END>(client) {} +}; + +RedClient::RedClient(Application& application) + : RedChannel(*this, RED_CHANNEL_MAIN, 0, new MainChannelLoop(*this)) + , _application (application) + , _connection_id (0) + , _mouse_mode (RED_MOUSE_MODE_SERVER) + , _notify_disconnect (false) + , _aborting (false) + , _agent_connected (false) + , _agent_mon_config_sent (false) + , _agent_msg (new VDAgentMessage) + , _agent_msg_data (NULL) + , _agent_msg_pos (0) + , _agent_tokens (0) + , _agent_timer (Platform::create_interval_timer(agent_timer_proc, NULL)) + , _migrate (*this) + , _glz_window (0, _glz_debug) +{ + MainChannelLoop* message_loop = static_cast<MainChannelLoop*>(get_message_handler()); + + message_loop->set_handler(RED_MIGRATE, &RedClient::handle_migrate, 0); + message_loop->set_handler(RED_SET_ACK, &RedClient::handle_set_ack, sizeof(RedSetAck)); + message_loop->set_handler(RED_PING, &RedClient::handle_ping, sizeof(RedPing)); + message_loop->set_handler(RED_WAIT_FOR_CHANNELS, &RedClient::handle_wait_for_channels, + sizeof(RedWaitForChannels)); + message_loop->set_handler(RED_DISCONNECTING, &RedClient::handle_disconnect, + sizeof(RedDisconnect)); + message_loop->set_handler(RED_NOTIFY, &RedClient::handle_notify, sizeof(RedNotify)); + + message_loop->set_handler(RED_MIGRATE_BEGIN, &RedClient::handle_migrate_begin, + sizeof(RedMigrationBegin)); + message_loop->set_handler(RED_MIGRATE_CANCEL, &RedClient::handle_migrate_cancel, 0); + message_loop->set_handler(RED_INIT, &RedClient::handle_init, sizeof(RedInit)); + message_loop->set_handler(RED_CHANNELS_LIST, &RedClient::handle_channels, + sizeof(RedChannels)); + message_loop->set_handler(RED_MOUSE_MODE, &RedClient::handle_mouse_mode, + sizeof(RedMouseMode)); + message_loop->set_handler(RED_MULTI_MEDIA_TIME, &RedClient::handle_mm_time, + sizeof(RedMultiMediaTime)); + + message_loop->set_handler(RED_AGENT_CONNECTED, &RedClient::handle_agent_connected, 0); + message_loop->set_handler(RED_AGENT_DISCONNECTED, &RedClient::handle_agent_disconnected, + sizeof(RedAgentDisconnect)); + message_loop->set_handler(RED_AGENT_DATA, &RedClient::handle_agent_data, 0); + message_loop->set_handler(RED_AGENT_TOKEN, &RedClient::handle_agent_tokens, + sizeof(RedAgentTokens)); + if (_agent_timer == INVALID_TIMER) { + THROW("invalid agent timer"); + } + start(); +} + +RedClient::~RedClient() +{ + ASSERT(_channels.empty()); + Platform::destroy_interval_timer(_agent_timer); + delete _agent_msg; +} + +void RedClient::init(const char* host, int port, int sport, const char *password, + bool auto_display_res) +{ + _host = host; + _port = port; + _sport = sport; + _auto_display_res = auto_display_res; + + if (password != NULL) { + _password = password; + } else { + _password = ""; + } +} + +void RedClient::set_target(const char* host, uint16_t port, uint16_t sport) +{ + _port = port; + _sport = sport; + _host.assign(host); +} + +void RedClient::push_event(Event* event) +{ + _application.push_event(event); +} + +void RedClient::on_connecting() +{ + _notify_disconnect = true; +} + +void RedClient::on_connect() +{ + push_event(new ConnectedEvent()); + _migrate.add_channel(new MigChannel(RED_CHANNEL_MAIN, 0, get_common_caps(), + get_caps())); +} + +void RedClient::on_disconnect() +{ + _migrate.abort(); + _connection_id = 0; + Platform::deactivate_interval_timer(_agent_timer); + _agent_mon_config_sent = false; + delete[] _agent_msg_data; + _agent_msg_data = NULL; + _agent_msg_pos = 0; + _agent_tokens = 0; +} + +void RedClient::delete_channels() +{ + Lock lock(_channels_lock); + Channels::iterator iter = _channels.begin(); + while (!_channels.empty()) { + RedChannel *channel = *_channels.begin(); + _channels.pop_front(); + delete channel; + } +} + +RedPeer::ConnectionOptions::Type RedClient::get_connection_options(uint32_t channel_type) +{ + return _con_opt_map[channel_type]; +} + +void RedClient::connect() +{ + //todo wait for disconnect state + if (_connection_id || !abort_channels()) { + return; + } + _pixmap_cache.clear(); + _glz_window.clear(); + memset(_sync_info, 0, sizeof(_sync_info)); + _aborting = false; + _migrate.clear_channels(); + delete_channels(); + enable(); + + _con_opt_map.clear(); + PeerConnectionOptMap::const_iterator iter = _application.get_con_opt_map().begin(); + PeerConnectionOptMap::const_iterator end = _application.get_con_opt_map().end(); + for (; iter != end; iter++) { + _con_opt_map[(*iter).first] = (*iter).second; + } + RedChannel::connect(); +} + +void RedClient::disconnect() +{ + _migrate.abort(); + RedChannel::disconnect(); +} + +void RedClient::disconnect_channels() +{ + Lock lock(_channels_lock); + Channels::iterator iter = _channels.begin(); + for (; iter != _channels.end(); ++iter) { + (*iter)->RedPeer::disconnect(); + } +} + +void RedClient::on_channel_disconnected(RedChannel& channel) +{ + Lock lock(_notify_lock); + if (_notify_disconnect) { + _notify_disconnect = false; + int connection_error = channel.get_connection_error(); + if (connection_error == SPICEC_ERROR_CODE_SUCCESS) { + LOG_INFO("disconneted"); + push_event(new DisconnectedEvent()); + } else { + push_event(new CoonnectionError(connection_error)); + } + } + disconnect_channels(); + RedPeer::disconnect(); +} + +bool RedClient::abort_channels() +{ + Lock lock(_channels_lock); + Channels::iterator iter = _channels.begin(); + + for (; iter != _channels.end(); ++iter) { + if (!(*iter)->abort()) { + return false; + } + } + return true; +} + +bool RedClient::abort() +{ + if (!_aborting) { + Lock lock(_sync_lock); + _aborting = true; + _sync_condition.notify_all(); + } + _pixmap_cache.abort(); + _glz_window.abort(); + if (RedChannel::abort() && abort_channels()) { + delete_channels(); + _migrate.abort(); + return true; + } else { + return false; + } +} + +void RedClient::handle_migrate_begin(RedPeer::InMessage* message) +{ + DBG(0, ""); + RedMigrationBegin* migrate = (RedMigrationBegin*)message->data(); + //add mig channels + _migrate.start(migrate); +} + +void RedClient::handle_migrate_cancel(RedPeer::InMessage* message) +{ + _migrate.abort(); +} + +ChannelFactory* RedClient::find_factory(uint32_t type) +{ + Factorys::iterator iter = _factorys.begin(); + for (; iter != _factorys.end(); ++iter) { + if ((*iter)->type() == type) { + return *iter; + } + } + LOG_WARN("no factory for %u", type); + return NULL; +} + +void RedClient::create_channel(uint32_t type, uint32_t id) +{ + ChannelFactory* factory = find_factory(type); + if (!factory) { + return; + } + RedChannel* channel = factory->construct(*this, id); + ASSERT(channel); + Lock lock(_channels_lock); + _channels.push_back(channel); + channel->start(); + channel->connect(); + _migrate.add_channel(new MigChannel(type, id, channel->get_common_caps(), channel->get_caps())); +} + +void RedClient::send_agent_monitors_config() +{ + AutoRef<MonitorsQuery > qury(new MonitorsQuery()); + push_event(*qury); + (*qury)->wait(); + if (!(*qury)->success()) { + THROW(" monitors query failed"); + } + + double min_distance = HUGE; + int dx = 0; + int dy = 0; + int i; + + std::vector<MonitorInfo>& monitors = (*qury)->get_monitors(); + std::vector<MonitorInfo>::iterator iter = monitors.begin(); + for (; iter != monitors.end(); iter++) { + double distance = sqrt(pow((double)(*iter).position.x, 2) + pow((double)(*iter).position.y, + 2)); + if (distance < min_distance) { + min_distance = distance; + dx = -(*iter).position.x; + dy = -(*iter).position.y; + } + } + + Message* message = new Message(REDC_AGENT_DATA,sizeof(VDAgentMessage) + + sizeof(VDAgentMonitorsConfig) + + monitors.size() * sizeof(VDAgentMonConfig)); + VDAgentMessage* msg = (VDAgentMessage*)message->data(); + msg->protocol = VD_AGENT_PROTOCOL; + msg->type = VD_AGENT_MONITORS_CONFIG; + msg->opaque = 0; + msg->size = sizeof(VDAgentMonitorsConfig) + monitors.size() * sizeof(VDAgentMonConfig); + + VDAgentMonitorsConfig* mon_config = (VDAgentMonitorsConfig*)msg->data; + mon_config->num_of_monitors = monitors.size(); + mon_config->flags = 0; + if (Platform::is_monitors_pos_valid()) { + mon_config->flags = VD_AGENT_CONFIG_MONITORS_FLAG_USE_POS; + } + for (iter = monitors.begin(), i = 0; iter != monitors.end(); iter++, i++) { + mon_config->monitors[i].depth = (*iter).depth; + mon_config->monitors[i].width = (*iter).size.x; + mon_config->monitors[i].height = (*iter).size.y; + mon_config->monitors[i].x = (*iter).position.x + dx; + mon_config->monitors[i].y = (*iter).position.y + dy; + } + ASSERT(_agent_tokens) + _agent_tokens--; + post_message(message); + _agent_mon_config_sent = true; +} + +#define MIN_DISPLAY_PIXMAP_CACHE (1024 * 1024 * 20) +#define MAX_DISPLAY_PIXMAP_CACHE (1024 * 1024 * 80) +#define MIN_MEM_FOR_OTHERS (1024 * 1024 * 40) + +// tmp till the pci mem will be shared by the qxls +#define MIN_GLZ_WINDOW_SIZE (1024 * 1024 * 12) +#define MAX_GLZ_WINDOW_SIZE MIN((LZ_MAX_WINDOW_SIZE * 4), 1024 * 1024 * 64) + +void RedClient::calc_pixmap_cach_and_glz_window_size(uint32_t display_channels_hint, + uint32_t pci_mem_hint) +{ +#ifdef WIN32 + display_channels_hint = MAX(1, display_channels_hint); + int max_cache_size = display_channels_hint * MAX_DISPLAY_PIXMAP_CACHE; + int min_cache_size = display_channels_hint * MIN_DISPLAY_PIXMAP_CACHE; + + MEMORYSTATUSEX mem_status; + mem_status.dwLength = sizeof(mem_status); + + if (!GlobalMemoryStatusEx(&mem_status)) { + THROW("get mem status failed %u", GetLastError()); + } + + //ullTotalPageFile is physical memory plus the size of the page file, minus a small overhead + uint64_t free_mem = mem_status.ullAvailPageFile; + if (free_mem < (min_cache_size + MIN_MEM_FOR_OTHERS + MIN_GLZ_WINDOW_SIZE)) { + THROW_ERR(SPICEC_ERROR_CODE_NOT_ENOUGH_MEMORY, "low memory condition"); + } + free_mem -= MIN_MEM_FOR_OTHERS; + _glz_window_size = MIN(MAX_GLZ_WINDOW_SIZE, pci_mem_hint / 2); + _glz_window_size = (int)MIN(free_mem / 3, _glz_window_size); + _glz_window_size = MAX(MIN_GLZ_WINDOW_SIZE, _glz_window_size); + free_mem -= _glz_window_size; + _pixmap_cache_size = MIN(free_mem, mem_status.ullAvailVirtual); + _pixmap_cache_size = MIN(free_mem, max_cache_size); +#else + //for now + _glz_window_size = (int)MIN(MAX_GLZ_WINDOW_SIZE, pci_mem_hint / 2); + _glz_window_size = MAX(MIN_GLZ_WINDOW_SIZE, _glz_window_size); + _pixmap_cache_size = MAX_DISPLAY_PIXMAP_CACHE; +#endif + + _pixmap_cache_size /= 4; + _glz_window_size /= 4; +} + +void RedClient::on_display_mode_change() +{ +#ifdef USE_OGL + Lock lock(_channels_lock); + Channels::iterator iter = _channels.begin(); + for (; iter != _channels.end(); ++iter) { + if ((*iter)->get_type() == RED_CHANNEL_DISPLAY) { + ((DisplayChannel *)(*iter))->recreate_ogl_context(); + } + } +#endif +} + +void RedClient::set_mouse_mode(uint32_t supported_modes, uint32_t current_mode) +{ + if (current_mode != _mouse_mode) { + _mouse_mode = current_mode; + Lock lock(_channels_lock); + Channels::iterator iter = _channels.begin(); + for (; iter != _channels.end(); ++iter) { + if ((*iter)->get_type() == RED_CHANNEL_CURSOR) { + ((CursorChannel *)(*iter))->set_cursor_mode(); + } + } + } + // FIXME: use configured mouse mode (currently, use client mouse mode if supported by server) + if ((supported_modes & RED_MOUSE_MODE_CLIENT) && (current_mode != RED_MOUSE_MODE_CLIENT)) { + Message* message = new Message(REDC_MOUSE_MODE_REQUEST, sizeof(RedcMouseModeRequest)); + RedcMouseModeRequest* mouse_mode_request = (RedcMouseModeRequest*)message->data(); + mouse_mode_request->mode = RED_MOUSE_MODE_CLIENT; + post_message(message); + } +} + +void RedClient::handle_init(RedPeer::InMessage* message) +{ + RedInit *init = (RedInit *)message->data(); + _connection_id = init->session_id; + set_mm_time(init->multi_media_time); + calc_pixmap_cach_and_glz_window_size(init->display_channels_hint, init->ram_hint); + _glz_window.set_pixels_capacity(_glz_window_size); + set_mouse_mode(init->supported_mouse_modes, init->current_mouse_mode); + _agent_tokens = init->agent_tokens; + _agent_connected = !!init->agent_connected; + if (_agent_connected) { + Message* msg = new Message(REDC_AGENT_START, sizeof(RedcAgentStart)); + RedcAgentStart* agent_start = (RedcAgentStart *)msg->data(); + agent_start->num_tokens = ~0; + post_message(msg); + } + if (_auto_display_res) { + Platform::activate_interval_timer(_agent_timer, AGENT_TIMEOUT); + if (_agent_connected) { + send_agent_monitors_config(); + } + } else { + post_message(new Message(REDC_ATTACH_CHANNELS, 0)); + } +} + +void RedClient::handle_channels(RedPeer::InMessage* message) +{ + RedChannels *init = (RedChannels *)message->data(); + RedChannelInit* channels = init->channels; + for (unsigned int i = 0; i < init->num_of_channels; i++) { + create_channel(channels[i].type, channels[i].id); + } +} + +void RedClient::handle_mouse_mode(RedPeer::InMessage* message) +{ + RedMouseMode *mouse_mode = (RedMouseMode *)message->data(); + set_mouse_mode(mouse_mode->supported_modes, mouse_mode->current_mode); +} + +void RedClient::handle_mm_time(RedPeer::InMessage* message) +{ + RedMultiMediaTime *mm_time = (RedMultiMediaTime *)message->data(); + set_mm_time(mm_time->time); +} + +void RedClient::handle_agent_connected(RedPeer::InMessage* message) +{ + DBG(0, ""); + _agent_connected = true; + Message* msg = new Message(REDC_AGENT_START, sizeof(RedcAgentStart)); + RedcAgentStart* agent_start = (RedcAgentStart *)msg->data(); + agent_start->num_tokens = ~0; + post_message(msg); + if (_auto_display_res && !_agent_mon_config_sent) { + send_agent_monitors_config(); + } +} + +void RedClient::handle_agent_disconnected(RedPeer::InMessage* message) +{ + DBG(0, ""); + _agent_connected = false; +} + +void RedClient::on_agent_reply(VDAgentReply* reply) +{ + switch (reply->error) { + case VD_AGENT_SUCCESS: + break; + case VD_AGENT_ERROR: + THROW_ERR(SPICEC_ERROR_CODE_AGENT_ERROR, "vdagent error"); + default: + THROW("unknown vdagent error"); + } + switch (reply->type) { + case VD_AGENT_MONITORS_CONFIG: + post_message(new Message(REDC_ATTACH_CHANNELS, 0)); + Platform::deactivate_interval_timer(_agent_timer); + break; + default: + THROW("unexpected vdagent reply type"); + } +} + +void RedClient::handle_agent_data(RedPeer::InMessage* message) +{ + uint32_t msg_size = message->size(); + uint8_t* msg_pos = message->data(); + uint32_t n; + + DBG(0, ""); + while (msg_size) { + if (_agent_msg_pos < sizeof(VDAgentMessage)) { + n = MIN(sizeof(VDAgentMessage) - _agent_msg_pos, msg_size); + memcpy((uint8_t*)_agent_msg + _agent_msg_pos, msg_pos, n); + _agent_msg_pos += n; + msg_size -= n; + msg_pos += n; + if (_agent_msg_pos == sizeof(VDAgentMessage)) { + if (_agent_msg->protocol != VD_AGENT_PROTOCOL) { + THROW("Invalid protocol %u", _agent_msg->protocol); + } + _agent_msg_data = new uint8_t[_agent_msg->size]; + } + } + if (_agent_msg_pos >= sizeof(VDAgentMessage)) { + n = MIN(sizeof(VDAgentMessage) + _agent_msg->size - _agent_msg_pos, msg_size); + memcpy(_agent_msg_data + _agent_msg_pos - sizeof(VDAgentMessage), msg_pos, n); + _agent_msg_pos += n; + msg_size -= n; + msg_pos += n; + } + if (_agent_msg_pos == sizeof(VDAgentMessage) + _agent_msg->size) { + switch (_agent_msg->type) { + case VD_AGENT_REPLY: { + on_agent_reply((VDAgentReply*)_agent_msg_data); + break; + } + default: + DBG(0, "Unsupported message type %u size %u", _agent_msg->type, _agent_msg->size); + } + delete[] _agent_msg_data; + _agent_msg_data = NULL; + _agent_msg_pos = 0; + } + } +} + +void RedClient::handle_agent_tokens(RedPeer::InMessage* message) +{ + RedAgentTokens *token = (RedAgentTokens *)message->data(); + _agent_tokens += token->num_tokens; +} + +void RedClient::migrate_channel(RedChannel& channel) +{ + DBG(0, "channel type %u id %u", channel.get_type(), channel.get_id()); + _migrate.swap_peer(channel); +} + +void RedClient::get_sync_info(uint8_t channel_type, uint8_t channel_id, SyncInfo& info) +{ + info.lock = &_sync_lock; + info.condition = &_sync_condition; + info.message_serial = &_sync_info[channel_type][channel_id]; +} + +void RedClient::wait_for_channels(int wait_list_size, RedWaitForChannel* wait_list) +{ + for (int i = 0; i < wait_list_size; i++) { + if (wait_list[i].channel_type >= RED_CHANNEL_END) { + THROW("invalid channel type %u", wait_list[i].channel_type); + } + uint64_t& sync_cell = _sync_info[wait_list[i].channel_type][wait_list[i].channel_id]; +#ifndef RED64 + Lock lock(_sync_lock); +#endif + if (sync_cell >= wait_list[i].message_serial) { + continue; + } +#ifdef RED64 + Lock lock(_sync_lock); +#endif + for (;;) { + if (sync_cell >= wait_list[i].message_serial) { + break; + } + if (_aborting) { + THROW("aborting"); + } + _sync_condition.wait(lock); + continue; + } + } +} + +void RedClient::set_mm_time(uint32_t time) +{ + Lock lock(_mm_clock_lock); + _mm_clock_last_update = Platform::get_monolithic_time(); + _mm_time = time; +} + +uint32_t RedClient::get_mm_time() +{ + Lock lock(_mm_clock_lock); + return uint32_t((Platform::get_monolithic_time() - _mm_clock_last_update) / 1000 / 1000 + + _mm_time); +} + +void RedClient::register_channel_factory(ChannelFactory& factory) +{ + _factorys.push_back(&factory); +} + |