summaryrefslogtreecommitdiff
path: root/ipc.c
diff options
context:
space:
mode:
Diffstat (limited to 'ipc.c')
-rw-r--r--ipc.c230
1 files changed, 230 insertions, 0 deletions
diff --git a/ipc.c b/ipc.c
new file mode 100644
index 0000000..58ebe65
--- /dev/null
+++ b/ipc.c
@@ -0,0 +1,230 @@
+/*
+ * IPC mini library
+ * Copyright (C) 2014 Fluendo S.A.
+ * @author: Josep Torra <josep@fluendo.com>
+ * *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "ipc.h"
+
+IPCMessage *
+ipc_message_new (IPCMsgType type)
+{
+ IPCMessage *message;
+
+ message = g_slice_new (IPCMessage);
+
+ message->refcount = 1;
+ message->type = type;
+ message->result = NULL;
+ message->delivered = 0;
+ message->synchronous = TRUE;
+
+ return message;
+}
+
+IPCMessage *
+ipc_message_ref (IPCMessage * message)
+{
+ g_return_val_if_fail (message != NULL, NULL);
+
+ g_atomic_int_inc (&message->refcount);
+
+ return message;
+}
+
+IPCMessage *
+ipc_message_unref (IPCMessage * message)
+{
+ g_return_val_if_fail (message != NULL, NULL);
+
+ if (g_atomic_int_dec_and_test (&message->refcount)) {
+ g_slice_free (IPCMessage, message);
+ message = NULL;
+ }
+
+ return message;
+}
+
+IPCMessage *
+ipc_message_eos_new()
+{
+ return ipc_message_new (IPC_MSG_TYPE_EOS);
+}
+
+IPCMessage *
+ipc_message_request_pool_new (guint size, guint width, guint height)
+{
+ IPCMessage * message = ipc_message_new (IPC_MSG_TYPE_REQUEST_POOL);
+ message->u.request_pool.size = size;
+ message->u.request_pool.width = width;
+ message->u.request_pool.height = height;
+
+ return message;
+}
+
+gboolean
+ipc_message_request_pool_parse (IPCMessage * message, guint * size,
+ guint * width, guint * height)
+{
+ g_return_val_if_fail (message != NULL, FALSE);
+
+ if (size) {
+ *size = message->u.request_pool.size;
+ }
+
+ if (width) {
+ *width = message->u.request_pool.width;
+ }
+
+ if (height) {
+ *height = message->u.request_pool.height;
+ }
+
+ return TRUE;
+}
+
+IPCMessage *
+ipc_message_render_frame_new (EGLImageMemory * mem)
+{
+ IPCMessage * message = ipc_message_new (IPC_MSG_TYPE_REQUEST_POOL);
+ message->u.render_frame.mem = mem;
+ message->synchronous = FALSE;
+
+ return message;
+}
+
+gboolean
+ipc_message_render_frame_parse (IPCMessage * message, EGLImageMemory ** mem)
+{
+ g_return_val_if_fail (message != NULL, FALSE);
+
+ if (mem) {
+ *mem = message->u.render_frame.mem;
+ }
+
+ return TRUE;
+}
+
+static inline void
+ipc_message_delivered_set (IPCMessage * message, gboolean delivered)
+{
+ g_atomic_int_set (&message->delivered, (delivered ? 1 : 0));
+}
+
+static inline gboolean
+ipc_message_delivered_get (IPCMessage * message)
+{
+ return (gboolean) g_atomic_int_get (&message->delivered);
+}
+
+IPCChannel *
+ipc_channel_new ()
+{
+ IPCChannel *channel;
+
+ channel = g_slice_new (IPCChannel);
+
+ channel->refcount = 1;
+ channel->queue =
+ g_async_queue_new_full ((GDestroyNotify) ipc_message_unref);
+ channel->lock = g_mutex_new ();
+ channel->cond = g_cond_new ();
+
+ return channel;
+}
+
+IPCChannel *
+ipc_channel_ref (IPCChannel * channel)
+{
+ g_return_val_if_fail (channel != NULL, NULL);
+
+ g_atomic_int_inc (&channel->refcount);
+
+ return channel;
+}
+
+IPCChannel *
+ipc_channel_unref (IPCChannel * channel)
+{
+ g_return_val_if_fail (channel != NULL, NULL);
+
+ if (g_atomic_int_dec_and_test (&channel->refcount)) {
+ g_async_queue_unref (channel->queue);
+ g_mutex_free (channel->lock);
+ g_cond_free (channel->cond);
+ g_slice_free (IPCChannel, channel);
+ channel = NULL;
+ }
+
+ return channel;
+}
+
+gpointer
+ipc_channel_send (IPCChannel * channel, IPCMessage * message)
+{
+ gpointer result = NULL;
+
+ g_return_val_if_fail (channel != NULL, NULL);
+ g_return_val_if_fail (message != NULL, NULL);
+
+ g_mutex_lock (channel->lock);
+
+ g_async_queue_push (channel->queue, ipc_message_ref (message));
+
+ if (message->synchronous) {
+ /* Waiting for message to be delivered */
+ do {
+ g_cond_wait (channel->cond, channel->lock);
+ } while (!ipc_message_delivered_get (message));
+ }
+
+ result = message->result;
+
+ g_mutex_unlock (channel->lock);
+ ipc_message_unref (message);
+
+ return result;
+}
+
+IPCMessage *
+ipc_channel_try_pop (IPCChannel * channel)
+{
+ IPCMessage * result;
+
+ g_mutex_lock (channel->lock);
+ result = g_async_queue_try_pop (channel->queue);
+ g_mutex_unlock (channel->lock);
+
+ return result;
+}
+
+void
+ipc_channel_delivered (IPCChannel * channel, IPCMessage * message,
+ gpointer result)
+{
+ message->result = result;
+ ipc_message_delivered_set (message, TRUE);
+
+ g_mutex_lock (channel->lock);
+ if (message->synchronous) {
+ g_cond_broadcast (channel->cond);
+ }
+ g_mutex_unlock (channel->lock);
+ ipc_message_unref (message);
+}
+