summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnon Gilboa <agilboa@redhat.com>2012-09-10 09:48:46 +0300
committerArnon Gilboa <agilboa@redhat.com>2012-09-10 09:48:46 +0300
commit4e95b73ecf11000b23cd506fc70a686baf83df5c (patch)
tree69ec6d6ddc31b5e3fdbc093b8c520e1deb4770d8
parent04a28a35edaa26fb5264298b1413640e8977093c (diff)
vdagent: add message_queue for messages written to pipe
This is only part of the message corruption solution. The other part is fixing virtio-serial / spice-qemu-char throttling code. -replace write_[lock/unlock/completion] calls with [new/enqueue]_message -remove clipboard specific _out_msg_* class members -remove ugly loop - while (a->_out_msg && a->write_clipboard()); -add _message_mutex for message queue -fix pending_write race using _write_mutex -TODO: enqueue large message without dividing it to chunks in advance rhbz #846427
-rw-r--r--vdagent/vdagent.cpp186
1 files changed, 91 insertions, 95 deletions
diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp
index b8bad44..3ffafe3 100644
--- a/vdagent/vdagent.cpp
+++ b/vdagent/vdagent.cpp
@@ -94,10 +94,10 @@ private:
enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH };
void set_control_event(int control_command);
void handle_control_event();
- uint8_t* write_lock(DWORD bytes = 0);
- void write_unlock(DWORD bytes = 0);
+ VDPipeMessage* new_message(DWORD bytes = 0);
+ void enqueue_message(VDPipeMessage* msg);
bool write_message(uint32_t type, uint32_t size, void* data);
- bool write_clipboard();
+ bool write_clipboard(VDAgentMessage* msg, uint32_t size);
bool connect_pipe();
bool send_input();
void set_display_depth(uint32_t depth);
@@ -119,9 +119,6 @@ private:
HANDLE _clipboard_event;
VDAgentMessage* _in_msg;
uint32_t _in_msg_pos;
- VDAgentMessage* _out_msg;
- uint32_t _out_msg_pos;
- uint32_t _out_msg_size;
bool _pending_input;
bool _pending_write;
bool _running;
@@ -131,7 +128,9 @@ private:
VDPipeState _pipe_state;
mutex_t _write_mutex;
mutex_t _control_mutex;
+ mutex_t _message_mutex;
std::queue<int> _control_queue;
+ std::queue<VDPipeMessage*> _message_queue;
bool _logon_desktop;
bool _display_setting_initialized;
@@ -167,9 +166,6 @@ VDAgent::VDAgent()
, _clipboard_event (NULL)
, _in_msg (NULL)
, _in_msg_pos (0)
- , _out_msg (NULL)
- , _out_msg_pos (0)
- , _out_msg_size (0)
, _pending_input (false)
, _pending_write (false)
, _running (false)
@@ -193,6 +189,7 @@ VDAgent::VDAgent()
ZeroMemory(&_pipe_state, sizeof(VDPipeState));
MUTEX_INIT(_write_mutex);
MUTEX_INIT(_control_mutex);
+ MUTEX_INIT(_message_mutex);
_singleton = this;
}
@@ -538,7 +535,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
}
DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
- reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+ reply_pipe_msg = new_message(msg_size);
if (!reply_pipe_msg) {
return false;
}
@@ -553,10 +550,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
reply = (VDAgentReply*)reply_msg->data;
reply->type = VD_AGENT_MONITORS_CONFIG;
reply->error = display_count ? VD_AGENT_SUCCESS : VD_AGENT_ERROR;
- write_unlock(msg_size);
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
+ enqueue_message(reply_pipe_msg);
return true;
}
@@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool request)
uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities) + VD_AGENT_CAPS_BYTES;
msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size;
- caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+ caps_pipe_msg = new_message(msg_size);
if (!caps_pipe_msg) {
return false;
}
@@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool request)
for (uint32_t i = 0 ; i < caps_size; ++i) {
vd_printf("%X", caps->caps[i]);
}
- write_unlock(msg_size);
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
+ enqueue_message(caps_pipe_msg);
return true;
}
@@ -750,11 +741,10 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
}
msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
- reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+ reply_pipe_msg = new_message(msg_size);
if (!reply_pipe_msg) {
return false;
}
-
reply_pipe_msg->type = VD_AGENT_COMMAND;
reply_pipe_msg->opaque = port;
reply_pipe_msg->size = sizeof(VDAgentMessage) + sizeof(VDAgentReply);
@@ -766,10 +756,7 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
reply = (VDAgentReply*)reply_msg->data;
reply->type = VD_AGENT_DISPLAY_CONFIG;
reply->error = VD_AGENT_SUCCESS;
- write_unlock(msg_size);
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
+ enqueue_message(reply_pipe_msg);
return true;
}
@@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
switch (msg->type) {
case VD_AGENT_RESET: {
vd_printf("Agent reset");
- VDPipeMessage* ack = (VDPipeMessage*)write_lock(sizeof(VDPipeMessage));
+ VDPipeMessage* ack = new_message(sizeof(VDPipeMessage));
if (!ack) {
return false;
}
ack->type = VD_AGENT_RESET_ACK;
ack->opaque = msg->opaque;
- write_unlock(sizeof(VDPipeMessage));
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
+ enqueue_message(ack);
break;
}
case VD_AGENT_SESSION_LOGON:
@@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
//FIXME: division to max size chunks should NOT be here, but in the service
// here we should write the max possible size to the pipe
-bool VDAgent::write_clipboard()
+bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size)
{
- ASSERT(_out_msg);
- DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size - _out_msg_pos, VD_AGENT_MAX_DATA_SIZE);
- VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n);
- if (!pipe_msg) {
- return false;
- }
- pipe_msg->type = VD_AGENT_COMMAND;
- pipe_msg->opaque = VDP_CLIENT_PORT;
- pipe_msg->size = n - sizeof(VDPipeMessage);
- memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n - sizeof(VDPipeMessage));
- write_unlock(n);
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
- _out_msg_pos += (n - sizeof(VDPipeMessage));
- if (_out_msg_pos == _out_msg_size) {
- delete[] (uint8_t *)_out_msg;
- _out_msg = NULL;
- _out_msg_size = 0;
- _out_msg_pos = 0;
- }
- return true;
+ uint32_t pos = 0;
+ bool ret = true;
+
+ ASSERT(msg && size);
+ //FIXME: do it smarter - no loop, no memcopy
+ MUTEX_LOCK(_message_mutex);
+ while (pos < size) {
+ DWORD n = MIN(sizeof(VDPipeMessage) + size - pos, VD_AGENT_MAX_DATA_SIZE);
+ VDPipeMessage* pipe_msg = new_message(n);
+ if (!pipe_msg) {
+ ret = false;
+ break;
+ }
+ pipe_msg->type = VD_AGENT_COMMAND;
+ pipe_msg->opaque = VDP_CLIENT_PORT;
+ pipe_msg->size = n - sizeof(VDPipeMessage);
+ memcpy(pipe_msg->data, (char*)msg + pos, n - sizeof(VDPipeMessage));
+ enqueue_message(pipe_msg);
+ pos += (n - sizeof(VDPipeMessage));
+ }
+ MUTEX_UNLOCK(_message_mutex);
+ return ret;
}
bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
@@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
VDPipeMessage* pipe_msg;
VDAgentMessage* msg;
- pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE + size);
+ pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size);
if (!pipe_msg) {
return false;
}
@@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
if (size && data) {
memcpy(msg->data, data, size);
}
- write_unlock(VD_MESSAGE_HEADER_SIZE + size);
- if (!_pending_write) {
- write_completion(0, 0, &_pipe_state.write.overlap);
- }
+ enqueue_message(pipe_msg);
return true;
}
@@ -993,6 +974,8 @@ bool VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab, uint32
// VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the request failed.
bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_request)
{
+ VDAgentMessage* msg;
+ uint32_t msg_size;
UINT format;
HANDLE clip_data;
uint8_t* new_data = NULL;
@@ -1008,10 +991,6 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
vd_printf("Unsupported clipboard type %u", clipboard_request->type);
return false;
}
- if (_out_msg) {
- vd_printf("clipboard change is already pending");
- return false;
- }
if (!IsClipboardFormatAvailable(format) || !OpenClipboard(_hwnd)) {
return false;
}
@@ -1047,14 +1026,13 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
CloseClipboard();
return false;
}
- _out_msg_pos = 0;
- _out_msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
- _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size];
- _out_msg->protocol = VD_AGENT_PROTOCOL;
- _out_msg->type = VD_AGENT_CLIPBOARD;
- _out_msg->opaque = 0;
- _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
- VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data;
+ msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
+ msg = (VDAgentMessage*)new uint8_t[msg_size];
+ msg->protocol = VD_AGENT_PROTOCOL;
+ msg->type = VD_AGENT_CLIPBOARD;
+ msg->opaque = 0;
+ msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
+ VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data;
clipboard->type = clipboard_request->type;
switch (clipboard_request->type) {
@@ -1070,7 +1048,8 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
break;
}
CloseClipboard();
- write_clipboard();
+ write_clipboard(msg, msg_size);
+ delete[] (uint8_t *)msg;
return true;
}
@@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
{
VDAgent* a = _singleton;
VDPipeState* ps = &a->_pipe_state;
+ DWORD size_left;
- a->_pending_write = false;
if (!a->_running) {
return;
}
@@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
a->_running = false;
return;
}
- if (!a->write_lock()) {
- a->_running = false;
- return;
- }
+ MUTEX_LOCK(a->_write_mutex);
ps->write.start += bytes;
if (ps->write.start == ps->write.end) {
ps->write.start = ps->write.end = 0;
- //DEBUG
- while (a->_out_msg && a->write_clipboard());
- } else if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
- ps->write.end - ps->write.start, overlap, write_completion)) {
- a->_pending_write = true;
+ }
+
+ MUTEX_LOCK(a->_message_mutex);
+ size_left = sizeof(a->_pipe_state.write.data) - a->_pipe_state.write.end;
+ while (!a->_message_queue.empty()) {
+ VDPipeMessage* msg = a->_message_queue.front();
+ DWORD size = sizeof(VDPipeMessage) + msg->size;
+
+ if (size > size_left) {
+ break;
+ }
+ a->_message_queue.pop();
+ memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end, msg, size);
+ a->_pipe_state.write.end += size;
+ size_left -= size;
+ delete msg;
+ }
+ MUTEX_UNLOCK(a->_message_mutex);
+
+ if (ps->write.start < ps->write.end) {
+ if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
+ ps->write.end - ps->write.start, overlap, write_completion)) {
+ a->_pending_write = true;
+ } else {
+ vd_printf("WriteFileEx() failed: %lu", GetLastError());
+ a->_running = false;
+ }
} else {
- vd_printf("WriteFileEx() failed: %lu", GetLastError());
- a->_running = false;
+ a->_pending_write = false;
}
- a->write_unlock();
+ MUTEX_UNLOCK(a->_write_mutex);
}
-uint8_t* VDAgent::write_lock(DWORD bytes)
+VDPipeMessage* VDAgent::new_message(DWORD bytes)
{
- MUTEX_LOCK(_write_mutex);
- if (_pipe_state.write.end + bytes <= sizeof(_pipe_state.write.data)) {
- return &_pipe_state.write.data[_pipe_state.write.end];
- } else {
- MUTEX_UNLOCK(_write_mutex);
- vd_printf("write buffer is full");
- return NULL;
- }
+ return (VDPipeMessage*)(new char[bytes]);
}
-void VDAgent::write_unlock(DWORD bytes)
+void VDAgent::enqueue_message(VDPipeMessage* msg)
{
- _pipe_state.write.end += bytes;
+ MUTEX_LOCK(_message_mutex);
+ _message_queue.push(msg);
+ MUTEX_UNLOCK(_message_mutex);
+ MUTEX_LOCK(_write_mutex);
+ if (!_pending_write) {
+ write_completion(0, 0, &_pipe_state.write.overlap);
+ }
MUTEX_UNLOCK(_write_mutex);
}