summaryrefslogtreecommitdiff
path: root/sys/shm
diff options
context:
space:
mode:
authorOlivier CrĂȘte <olivier.crete@collabora.co.uk>2010-06-03 14:42:06 -0400
committerOlivier CrĂȘte <olivier.crete@collabora.co.uk>2010-06-03 15:27:43 -0400
commit21686e30779b225e903fb7a6e90e9c5a81c7c9d0 (patch)
treec51039c8d942a59f72cebafffde6f6dffffc8d10 /sys/shm
parent536e79befd17d62aba4eaa6308fef6128dd5fa09 (diff)
shm: Move to sys/ since it doesn't exist on windows
Diffstat (limited to 'sys/shm')
-rw-r--r--sys/shm/Makefile.am13
-rw-r--r--sys/shm/gstshm.c42
-rw-r--r--sys/shm/gstshmsink.c602
-rw-r--r--sys/shm/gstshmsink.h76
-rw-r--r--sys/shm/gstshmsrc.c386
-rw-r--r--sys/shm/gstshmsrc.h76
-rw-r--r--sys/shm/shmalloc.c153
-rw-r--r--sys/shm/shmalloc.h47
-rw-r--r--sys/shm/shmpipe.c831
-rw-r--r--sys/shm/shmpipe.h78
10 files changed, 2304 insertions, 0 deletions
diff --git a/sys/shm/Makefile.am b/sys/shm/Makefile.am
new file mode 100644
index 000000000..fdc034b71
--- /dev/null
+++ b/sys/shm/Makefile.am
@@ -0,0 +1,13 @@
+glib_enum_prefix = gst_shm
+
+include $(top_srcdir)/common/glib-gen.mak
+
+plugin_LTLIBRARIES = libgstshm.la
+
+libgstshm_la_SOURCES = shmpipe.c shmalloc.c gstshm.c gstshmsrc.c gstshmsink.c
+libgstshm_la_CFLAGS = $(GST_CFLAGS)
+libgstshm_la_LIBADD = -lrt
+libgstshm_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS)
+libgstshm_la_LIBTOOLFLAGS = --tag=disable-static
+
+noinst_HEADERS = gstshmsrc.h gstshmsink.h shmpipe.h shmalloc.h
diff --git a/sys/shm/gstshm.c b/sys/shm/gstshm.c
new file mode 100644
index 000000000..d57db6755
--- /dev/null
+++ b/sys/shm/gstshm.c
@@ -0,0 +1,42 @@
+/* GStreamer
+ * Copyright (C) <2009> Collabora Ltd
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
+ * Copyright (C) <2009> Nokia Inc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstshmsrc.h"
+#include "gstshmsink.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ return gst_element_register (plugin, "shmsrc",
+ GST_RANK_NONE, GST_TYPE_SHM_SRC) &&
+ gst_element_register (plugin, "shmsink",
+ GST_RANK_NONE, GST_TYPE_SHM_SINK);
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ "shm",
+ "shared memory sink source",
+ plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/sys/shm/gstshmsink.c b/sys/shm/gstshmsink.c
new file mode 100644
index 000000000..ba8608fa9
--- /dev/null
+++ b/sys/shm/gstshmsink.c
@@ -0,0 +1,602 @@
+/* GStreamer
+ * Copyright (C) <2009> Collabora Ltd
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
+ * Copyright (C) <2009> Nokia Inc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstshmsink.h"
+
+#include <gst/gst.h>
+
+#include <string.h>
+
+/* signals */
+enum
+{
+ SIGNAL_CLIENT_CONNECTED,
+ SIGNAL_CLIENT_DISCONNECTED,
+ LAST_SIGNAL
+};
+
+/* properties */
+enum
+{
+ PROP_0,
+ PROP_SOCKET_PATH,
+ PROP_PERMS,
+ PROP_SHM_SIZE,
+ PROP_WAIT_FOR_CONNECTION
+};
+
+struct GstShmClient
+{
+ ShmClient *client;
+ GstPollFD pollfd;
+};
+
+#define DEFAULT_SIZE ( 256 * 1024 )
+#define DEFAULT_WAIT_FOR_CONNECTION (TRUE)
+#define DEFAULT_PERMS (S_IRWXU | S_IRWXG)
+
+
+GST_DEBUG_CATEGORY_STATIC (shmsink_debug);
+#define GST_CAT_DEFAULT shmsink_debug
+
+static const GstElementDetails gst_shm_sink_details =
+GST_ELEMENT_DETAILS ("Shared Memory Sink",
+ "Sink",
+ "Send data over shared memory to the matching source",
+ "Olivier Crete <olivier.crete@collabora.co.uk>");
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_BOILERPLATE (GstShmSink, gst_shm_sink, GstBaseSink, GST_TYPE_BASE_SINK);
+
+static void gst_shm_sink_finalize (GObject * object);
+static void gst_shm_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_shm_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static gboolean gst_shm_sink_start (GstBaseSink * bsink);
+static gboolean gst_shm_sink_stop (GstBaseSink * bsink);
+static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
+static GstFlowReturn gst_shm_sink_buffer_alloc (GstBaseSink * sink,
+ guint64 offset, guint size, GstCaps * caps, GstBuffer ** out_buf);
+
+static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
+static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
+static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink);
+
+static gpointer pollthread_func (gpointer data);
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+static void
+gst_shm_sink_base_init (gpointer g_class)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&sinktemplate));
+
+ gst_element_class_set_details (element_class, &gst_shm_sink_details);
+}
+
+
+static void
+gst_shm_sink_init (GstShmSink * self, GstShmSinkClass * g_class)
+{
+ self->cond = g_cond_new ();
+ self->size = DEFAULT_SIZE;
+ self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION;
+ self->perms = DEFAULT_PERMS;
+}
+
+static void
+gst_shm_sink_class_init (GstShmSinkClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+ GstBaseSinkClass *gstbasesink_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+ gstbasesink_class = (GstBaseSinkClass *) klass;
+
+ gobject_class->finalize = gst_shm_sink_finalize;
+ gobject_class->set_property = gst_shm_sink_set_property;
+ gobject_class->get_property = gst_shm_sink_get_property;
+
+ gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start);
+ gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop);
+ gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render);
+ gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
+ gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
+ gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop);
+ gstbasesink_class->buffer_alloc =
+ GST_DEBUG_FUNCPTR (gst_shm_sink_buffer_alloc);
+
+ g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
+ g_param_spec_string ("socket-path",
+ "Path to the control socket",
+ "The path to the control socket used to control the shared memory"
+ " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_PERMS,
+ g_param_spec_uint ("perms",
+ "Permissions on the shm area",
+ "Permissions to set on the shm area",
+ 0, 07777, DEFAULT_PERMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_SHM_SIZE,
+ g_param_spec_uint ("shm-size",
+ "Size of the shm area",
+ "Size of the shared memory area",
+ 0, G_MAXUINT, DEFAULT_SIZE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
+ g_param_spec_boolean ("wait-for-connection",
+ "Wait for a connection until rendering",
+ "Block the stream until the shm pipe is connected",
+ DEFAULT_WAIT_FOR_CONNECTION,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected",
+ GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+ g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+
+ signals[SIGNAL_CLIENT_DISCONNECTED] = g_signal_new ("client-disconnected",
+ GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+ g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+
+ GST_DEBUG_CATEGORY_INIT (shmsink_debug, "shmsink", 0, "Shared Memory Sink");
+}
+
+static void
+gst_shm_sink_finalize (GObject * object)
+{
+ GstShmSink *self = GST_SHM_SINK (object);
+
+ g_cond_free (self->cond);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+/*
+ * Set the value of a property for the server sink.
+ */
+static void
+gst_shm_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstShmSink *self = GST_SHM_SINK (object);
+ int ret = 0;
+
+ switch (prop_id) {
+ case PROP_SOCKET_PATH:
+ GST_OBJECT_LOCK (object);
+ g_free (self->socket_path);
+ self->socket_path = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (object);
+ break;
+ case PROP_PERMS:
+ GST_OBJECT_LOCK (object);
+ self->perms = g_value_get_uint (value);
+ if (self->pipe)
+ ret = sp_writer_setperms_shm (self->pipe, self->perms);
+ GST_OBJECT_UNLOCK (object);
+ if (ret < 0)
+ GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s",
+ strerror (ret));
+ break;
+ case PROP_SHM_SIZE:
+ GST_OBJECT_LOCK (object);
+ if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0)
+ GST_DEBUG_OBJECT (self,
+ "Resize shared memory area from %u to %u bytes");
+ else
+ GST_WARNING_OBJECT (self,
+ "Could not resize shared memory area from %u to %u bytes");
+ self->size = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (object);
+ break;
+ case PROP_WAIT_FOR_CONNECTION:
+ GST_OBJECT_LOCK (object);
+ self->wait_for_connection = g_value_get_boolean (value);
+ GST_OBJECT_UNLOCK (object);
+ g_cond_broadcast (self->cond);
+ break;
+ default:
+ break;
+ }
+}
+
+static void
+gst_shm_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstShmSink *self = GST_SHM_SINK (object);
+
+ GST_OBJECT_LOCK (object);
+
+ switch (prop_id) {
+ case PROP_SOCKET_PATH:
+ g_value_set_string (value, self->socket_path);
+ break;
+ case PROP_PERMS:
+ self->perms = g_value_get_uint (value);
+ if (self->pipe) {
+ int ret;
+
+ ret = sp_writer_setperms_shm (self->pipe, self->perms);
+ if (ret < 0)
+ GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s",
+ strerror (ret));
+ }
+ break;
+ case PROP_SHM_SIZE:
+ g_value_set_uint (value, self->size);
+ break;
+ case PROP_WAIT_FOR_CONNECTION:
+ g_value_set_boolean (value, self->wait_for_connection);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+
+ GST_OBJECT_UNLOCK (object);
+}
+
+
+
+static gboolean
+gst_shm_sink_start (GstBaseSink * bsink)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+
+ self->stop = FALSE;
+
+ if (!self->socket_path) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
+ ("Could not open socket."), (NULL));
+ return FALSE;
+ }
+
+ GST_DEBUG_OBJECT (self, "Creating new socket at %s"
+ " with shared memory of %d bytes", self->socket_path, self->size);
+
+ self->pipe = sp_writer_create (self->socket_path, self->size, self->perms);
+
+ if (!self->pipe) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
+ ("Could not open socket."), (NULL));
+ return FALSE;
+ }
+
+ g_free (self->socket_path);
+ self->socket_path = g_strdup (sp_writer_get_path (self->pipe));
+
+ GST_DEBUG ("Created socket at %s", self->socket_path);
+
+ self->poll = gst_poll_new (TRUE);
+ gst_poll_fd_init (&self->serverpollfd);
+ self->serverpollfd.fd = sp_get_fd (self->pipe);
+ gst_poll_add_fd (self->poll, &self->serverpollfd);
+ gst_poll_fd_ctl_read (self->poll, &self->serverpollfd, TRUE);
+
+ self->pollthread = g_thread_create (pollthread_func, self, TRUE, NULL);
+
+ if (!self->pollthread)
+ goto thread_error;
+
+ return TRUE;
+
+thread_error:
+
+ sp_close (self->pipe);
+ self->pipe = NULL;
+ gst_poll_free (self->poll);
+
+ GST_ELEMENT_ERROR (self, CORE, THREAD, ("Could not srart thread"), (NULL));
+ return FALSE;
+}
+
+
+static gboolean
+gst_shm_sink_stop (GstBaseSink * bsink)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+
+ self->stop = TRUE;
+ gst_poll_set_flushing (self->poll, TRUE);
+
+ g_thread_join (self->pollthread);
+ self->pollthread = NULL;
+
+ GST_DEBUG_OBJECT (self, "Stopping");
+
+ while (self->clients) {
+ struct GstShmClient *client = self->clients->data;
+ self->clients = g_list_remove (self->clients, client);
+ sp_writer_close_client (self->pipe, client->client);
+ g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
+ client->pollfd.fd);
+ g_slice_free (struct GstShmClient, client);
+ }
+
+ gst_poll_free (self->poll);
+ self->poll = NULL;
+
+ sp_close (self->pipe);
+ self->pipe = NULL;
+
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+ int rv;
+
+ GST_OBJECT_LOCK (self);
+ while (self->wait_for_connection && !self->clients) {
+ g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self));
+ if (self->unlock) {
+ GST_OBJECT_UNLOCK (self);
+ return GST_FLOW_WRONG_STATE;
+ }
+ }
+
+ rv = sp_writer_send_buf (self->pipe, (char *) GST_BUFFER_DATA (buf),
+ GST_BUFFER_SIZE (buf));
+
+ if (rv == -1) {
+ ShmBlock *block = NULL;
+ gchar *shmbuf = NULL;
+ while ((block = sp_writer_alloc_block (self->pipe,
+ GST_BUFFER_SIZE (buf))) == NULL) {
+ g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self));
+ if (self->unlock) {
+ GST_OBJECT_UNLOCK (self);
+ return GST_FLOW_WRONG_STATE;
+ }
+ }
+ while (self->wait_for_connection && !self->clients) {
+ g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self));
+ if (self->unlock) {
+ sp_writer_free_block (block);
+ GST_OBJECT_UNLOCK (self);
+ return GST_FLOW_WRONG_STATE;
+ }
+ }
+
+
+ shmbuf = sp_writer_block_get_buf (block);
+ memcpy (shmbuf, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
+ sp_writer_send_buf (self->pipe, shmbuf, GST_BUFFER_SIZE (buf));
+ sp_writer_free_block (block);
+ }
+
+ GST_OBJECT_UNLOCK (self);
+
+ return GST_FLOW_OK;
+}
+
+void
+gst_shm_sink_free_buffer (gpointer data)
+{
+ ShmBlock *block = data;
+ sp_writer_free_block (block);
+}
+
+static GstFlowReturn
+gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
+ GstCaps * caps, GstBuffer ** out_buf)
+{
+ GstShmSink *self = GST_SHM_SINK (sink);
+ GstBuffer *buffer;
+ ShmBlock *block = NULL;
+ gpointer buf = NULL;
+
+ GST_OBJECT_LOCK (self);
+ block = sp_writer_alloc_block (self->pipe, size);
+ if (block)
+ buf = sp_writer_block_get_buf (block);
+ GST_OBJECT_UNLOCK (self);
+
+ if (block) {
+ buffer = gst_buffer_new ();
+ GST_BUFFER_DATA (buffer) = buf;
+ GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block;
+ GST_BUFFER_FREE_FUNC (buffer) =
+ GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer);
+ GST_BUFFER_SIZE (buffer) = size;
+ GST_LOG_OBJECT (self,
+ "Allocated buffer of %lu bytes from shared memory at %p", size, buf);
+ } else {
+ buffer = gst_buffer_new_and_alloc (size);
+ GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %lu bytes, "
+ "allocating using standard allocator", size);
+ }
+
+ GST_BUFFER_OFFSET (buffer) = offset;
+ gst_buffer_set_caps (buffer, caps);
+
+ *out_buf = buffer;
+
+ return GST_FLOW_OK;
+}
+
+static gpointer
+pollthread_func (gpointer data)
+{
+ GstShmSink *self = GST_SHM_SINK (data);
+ GList *item;
+
+ while (!self->stop) {
+
+ if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0)
+ return NULL;
+
+ if (self->stop)
+ return NULL;
+
+ if (gst_poll_fd_has_closed (self->poll, &self->serverpollfd)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed read from shmsink"),
+ ("Control socket has closed"));
+ return NULL;
+ }
+
+ if (gst_poll_fd_has_error (self->poll, &self->serverpollfd)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsink"),
+ ("Control socket has error"));
+ return NULL;
+ }
+
+ if (gst_poll_fd_can_read (self->poll, &self->serverpollfd)) {
+ ShmClient *client;
+ struct GstShmClient *gclient;
+
+ GST_OBJECT_LOCK (self);
+ client = sp_writer_accept_client (self->pipe);
+ GST_OBJECT_UNLOCK (self);
+
+ if (!client) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ,
+ ("Failed to read from shmsink"),
+ ("Control socket returns wrong data"));
+ return NULL;
+ }
+
+ gclient = g_slice_new (struct GstShmClient);
+ gclient->client = client;
+ gst_poll_fd_init (&gclient->pollfd);
+ gclient->pollfd.fd = sp_writer_get_client_fd (client);
+ gst_poll_add_fd (self->poll, &gclient->pollfd);
+ gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE);
+ self->clients = g_list_prepend (self->clients, gclient);
+ g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0,
+ gclient->pollfd.fd);
+ }
+
+ again:
+ for (item = self->clients; item; item = item->next) {
+ struct GstShmClient *gclient = item->data;
+
+ if (gst_poll_fd_has_closed (self->poll, &gclient->pollfd)) {
+ GST_WARNING_OBJECT (self, "One client is gone, closing");
+ goto close_client;
+ }
+
+ if (gst_poll_fd_has_error (self->poll, &gclient->pollfd)) {
+ GST_WARNING_OBJECT (self, "One client fd has error, closing");
+ goto close_client;
+ }
+
+ if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) {
+ int rv;
+
+ GST_OBJECT_LOCK (self);
+ rv = sp_writer_recv (self->pipe, gclient->client);
+ GST_OBJECT_UNLOCK (self);
+
+ if (rv < 0) {
+ GST_WARNING_OBJECT (self, "One client has read error,"
+ " closing (retval: %d errno: %d)", rv, errno);
+ goto close_client;
+ }
+ }
+ continue;
+ close_client:
+ GST_OBJECT_LOCK (self);
+ sp_writer_close_client (self->pipe, gclient->client);
+ GST_OBJECT_UNLOCK (self);
+
+ gst_poll_remove_fd (self->poll, &gclient->pollfd);
+ self->clients = g_list_remove (self->clients, gclient);
+
+ g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
+ gclient->pollfd.fd);
+ g_slice_free (struct GstShmClient, gclient);
+
+ goto again;
+ }
+
+ g_cond_broadcast (self->cond);
+ }
+
+ return NULL;
+}
+
+static gboolean
+gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_EOS:
+ GST_OBJECT_LOCK (self);
+ while (self->wait_for_connection && sp_writer_pending_writes (self->pipe)
+ && !self->unlock)
+ g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self));
+ GST_OBJECT_UNLOCK (self);
+ break;
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+
+static gboolean
+gst_shm_sink_unlock (GstBaseSink * bsink)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+
+ GST_OBJECT_LOCK (self);
+ self->unlock = TRUE;
+ GST_OBJECT_UNLOCK (self);
+
+ g_cond_broadcast (self->cond);
+ return TRUE;
+}
+
+static gboolean
+gst_shm_sink_unlock_stop (GstBaseSink * bsink)
+{
+ GstShmSink *self = GST_SHM_SINK (bsink);
+
+ GST_OBJECT_LOCK (self);
+ self->unlock = FALSE;
+ GST_OBJECT_UNLOCK (self);
+
+ return TRUE;
+}
diff --git a/sys/shm/gstshmsink.h b/sys/shm/gstshmsink.h
new file mode 100644
index 000000000..bc6da74bc
--- /dev/null
+++ b/sys/shm/gstshmsink.h
@@ -0,0 +1,76 @@
+/* GStreamer
+ * Copyright (C) <2009> Collabora Ltd
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
+ * Copyright (C) <2009> Nokia Inc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_SHM_SINK_H__
+#define __GST_SHM_SINK_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+
+#include "shmpipe.h"
+
+G_BEGIN_DECLS
+#define GST_TYPE_SHM_SINK \
+ (gst_shm_sink_get_type())
+#define GST_SHM_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK,GstShmSink))
+#define GST_SHM_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK,GstShmSinkClass))
+#define GST_IS_SHM_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK))
+#define GST_IS_SHM_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK))
+typedef struct _GstShmSink GstShmSink;
+typedef struct _GstShmSinkClass GstShmSinkClass;
+
+struct _GstShmSink
+{
+ GstBaseSink element;
+
+ gchar *socket_path;
+
+ ShmPipe *pipe;
+
+ guint perms;
+ guint size;
+
+ GList *clients;
+
+ GThread *pollthread;
+ GstPoll *poll;
+ GstPollFD serverpollfd;
+
+ gboolean wait_for_connection;
+ gboolean stop;
+ gboolean unlock;
+
+ GCond *cond;
+};
+
+struct _GstShmSinkClass
+{
+ GstBaseSinkClass parent_class;
+};
+
+GType gst_shm_sink_get_type (void);
+
+G_END_DECLS
+#endif /* __GST_SHM_SINK_H__ */
diff --git a/sys/shm/gstshmsrc.c b/sys/shm/gstshmsrc.c
new file mode 100644
index 000000000..93d2d10af
--- /dev/null
+++ b/sys/shm/gstshmsrc.c
@@ -0,0 +1,386 @@
+/* GStreamer
+ * Copyright (C) <2009> Collabora Ltd
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
+ * Copyright (C) <2009> Nokia Inc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstshmsrc.h"
+
+#include <gst/gst.h>
+
+#include <string.h>
+
+/* signals */
+enum
+{
+ LAST_SIGNAL
+};
+
+/* properties */
+enum
+{
+ PROP_0,
+ PROP_SOCKET_PATH,
+ PROP_IS_LIVE
+};
+
+struct GstShmBuffer
+{
+ char *buf;
+ GstShmPipe *pipe;
+};
+
+
+GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
+#define GST_CAT_DEFAULT shmsrc_debug
+
+static const GstElementDetails gst_shm_src_details =
+GST_ELEMENT_DETAILS ("Shared Memory Source",
+ "Source",
+ "Receive data from the sharem memory sink",
+ "Olivier Crete <olivier.crete@collabora.co.uk");
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_BOILERPLATE (GstShmSrc, gst_shm_src, GstPushSrc, GST_TYPE_PUSH_SRC);
+
+static void gst_shm_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_shm_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
+static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
+static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
+ GstBuffer ** outbuf);
+static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
+static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
+
+static void gst_shm_pipe_inc (GstShmPipe * pipe);
+static void gst_shm_pipe_dec (GstShmPipe * pipe);
+
+// static guint gst_shm_src_signals[LAST_SIGNAL] = { 0 };
+
+
+static void
+gst_shm_src_base_init (gpointer g_class)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&srctemplate));
+
+ gst_element_class_set_details (element_class, &gst_shm_src_details);
+}
+
+static void
+gst_shm_src_class_init (GstShmSrcClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstBaseSrcClass *gstbasesrc_class;
+ GstPushSrcClass *gstpush_src_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
+ gstpush_src_class = (GstPushSrcClass *) klass;
+
+ gobject_class->set_property = gst_shm_src_set_property;
+ gobject_class->get_property = gst_shm_src_get_property;
+
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
+ gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
+ gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
+
+ gstpush_src_class->create = gst_shm_src_create;
+
+ g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
+ g_param_spec_string ("socket-path",
+ "Path to the control socket",
+ "The path to the control socket used to control the shared memory"
+ " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_IS_LIVE,
+ g_param_spec_boolean ("is-live", "Is this a live source",
+ "True if the element cannot produce data in PAUSED", FALSE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
+}
+
+static void
+gst_shm_src_init (GstShmSrc * self, GstShmSrcClass * g_class)
+{
+}
+
+
+static void
+gst_shm_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstShmSrc *self = GST_SHM_SRC (object);
+
+ switch (prop_id) {
+ case PROP_SOCKET_PATH:
+ GST_OBJECT_LOCK (object);
+ if (self->pipe) {
+ GST_WARNING_OBJECT (object, "Can not modify socket path while the "
+ "element is playing");
+ } else {
+ g_free (self->socket_path);
+ self->socket_path = g_value_dup_string (value);
+ }
+ GST_OBJECT_UNLOCK (object);
+ break;
+ case PROP_IS_LIVE:
+ gst_base_src_set_live (GST_BASE_SRC (object),
+ g_value_get_boolean (value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_shm_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstShmSrc *self = GST_SHM_SRC (object);
+
+ switch (prop_id) {
+ case PROP_SOCKET_PATH:
+ GST_OBJECT_LOCK (object);
+ g_value_set_string (value, self->socket_path);
+ GST_OBJECT_UNLOCK (object);
+ break;
+ case PROP_IS_LIVE:
+ g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object)));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+gst_shm_src_start (GstBaseSrc * bsrc)
+{
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
+ GstShmPipe *gstpipe = g_slice_new0 (GstShmPipe);
+
+ gstpipe->use_count = 1;
+ gstpipe->src = gst_object_ref (self);
+
+ if (!self->socket_path) {
+ GST_ELEMENT_ERROR (bsrc, RESOURCE, NOT_FOUND,
+ ("No path specified for socket."), (NULL));
+ return FALSE;
+ }
+
+ GST_DEBUG ("Opening socket %s", self->socket_path);
+
+ GST_OBJECT_LOCK (self);
+ gstpipe->pipe = sp_client_open (self->socket_path);
+ GST_OBJECT_UNLOCK (self);
+
+ if (!gstpipe->pipe) {
+ GST_ELEMENT_ERROR (bsrc, RESOURCE, OPEN_READ_WRITE,
+ ("Could not open socket %s: %d %s", self->socket_path, errno,
+ strerror (errno)), (NULL));
+ gst_shm_pipe_dec (gstpipe);
+ return FALSE;
+ }
+
+ self->pipe = gstpipe;
+
+ self->poll = gst_poll_new (TRUE);
+ gst_poll_fd_init (&self->pollfd);
+ self->pollfd.fd = sp_get_fd (self->pipe->pipe);
+ gst_poll_add_fd (self->poll, &self->pollfd);
+ gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
+
+ return TRUE;
+}
+
+static gboolean
+gst_shm_src_stop (GstBaseSrc * bsrc)
+{
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
+
+ GST_DEBUG_OBJECT (self, "Stopping %p", self);
+
+ if (self->pipe) {
+ gst_shm_pipe_dec (self->pipe);
+ self->pipe = NULL;
+ }
+
+ gst_poll_free (self->poll);
+ self->poll = NULL;
+
+ return TRUE;
+}
+
+
+static void
+free_buffer (gpointer data)
+{
+ struct GstShmBuffer *gsb = data;
+ g_return_if_fail (gsb->pipe != NULL);
+ g_return_if_fail (gsb->pipe->src != NULL);
+
+ GST_LOG ("Freeing buffer %p", gsb->buf);
+
+ GST_OBJECT_LOCK (gsb->pipe->src);
+ sp_client_recv_finish (gsb->pipe->pipe, gsb->buf);
+ GST_OBJECT_UNLOCK (gsb->pipe->src);
+
+ gst_shm_pipe_dec (gsb->pipe);
+
+ g_slice_free (struct GstShmBuffer, gsb);
+}
+
+static GstFlowReturn
+gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
+{
+ GstShmSrc *self = GST_SHM_SRC (psrc);
+ gchar *buf = NULL;
+ int rv = 0;
+ struct GstShmBuffer *gsb;
+
+ do {
+ if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
+ if (errno == EBUSY)
+ return GST_FLOW_WRONG_STATE;
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
+ ("Poll failed on fd: %s", strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+
+ if (self->unlocked)
+ return GST_FLOW_WRONG_STATE;
+
+ if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
+ ("Control socket has closed"));
+ return GST_FLOW_ERROR;
+ }
+
+ if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
+ ("Control socket has error"));
+ return GST_FLOW_ERROR;
+ }
+
+ if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
+ buf = NULL;
+ GST_LOG_OBJECT (self, "Reading from pipe");
+ GST_OBJECT_LOCK (self);
+ rv = sp_client_recv (self->pipe->pipe, &buf);
+ GST_OBJECT_UNLOCK (self);
+ if (rv < 0) {
+ GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
+ ("Error reading control data: %d", rv));
+ return GST_FLOW_ERROR;
+ }
+ }
+ } while (buf == NULL);
+
+ GST_LOG_OBJECT (self, "Got buffer %p of size %d %d", buf, rv);
+
+ gsb = g_slice_new0 (struct GstShmBuffer);
+ gsb->buf = buf;
+ gsb->pipe = self->pipe;
+ gst_shm_pipe_inc (self->pipe);
+
+ *outbuf = gst_buffer_new ();
+ GST_BUFFER_FLAG_SET (*outbuf, GST_BUFFER_FLAG_READONLY);
+ GST_BUFFER_DATA (*outbuf) = (guint8 *) buf;
+ GST_BUFFER_SIZE (*outbuf) = rv;
+ GST_BUFFER_MALLOCDATA (*outbuf) = (guint8 *) gsb;
+ GST_BUFFER_FREE_FUNC (*outbuf) = free_buffer;
+
+ return GST_FLOW_OK;
+}
+
+static gboolean
+gst_shm_src_unlock (GstBaseSrc * bsrc)
+{
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
+
+ self->unlocked = TRUE;
+
+ if (self->poll)
+ gst_poll_set_flushing (self->poll, TRUE);
+
+ return TRUE;
+}
+
+static gboolean
+gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
+{
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
+
+ self->unlocked = FALSE;
+
+ if (self->poll)
+ gst_poll_set_flushing (self->poll, FALSE);
+
+ return TRUE;
+}
+
+static void
+gst_shm_pipe_inc (GstShmPipe * pipe)
+{
+ g_return_if_fail (pipe);
+ g_return_if_fail (pipe->src);
+ g_return_if_fail (pipe->use_count > 0);
+
+ GST_OBJECT_LOCK (pipe->src);
+ pipe->use_count++;
+ GST_OBJECT_UNLOCK (pipe->src);
+}
+
+static void
+gst_shm_pipe_dec (GstShmPipe * pipe)
+{
+ g_return_if_fail (pipe);
+ g_return_if_fail (pipe->src);
+ g_return_if_fail (pipe->use_count > 0);
+
+ GST_OBJECT_LOCK (pipe->src);
+ pipe->use_count--;
+
+ if (pipe->use_count > 0) {
+ GST_OBJECT_UNLOCK (pipe->src);
+ return;
+ }
+
+ if (pipe->pipe)
+ sp_close (pipe->pipe);
+ GST_OBJECT_UNLOCK (pipe->src);
+
+ gst_object_unref (pipe->src);
+ g_slice_free (GstShmPipe, pipe);
+}
diff --git a/sys/shm/gstshmsrc.h b/sys/shm/gstshmsrc.h
new file mode 100644
index 000000000..79d7e8a18
--- /dev/null
+++ b/sys/shm/gstshmsrc.h
@@ -0,0 +1,76 @@
+/* GStreamer
+ * Copyright (C) <2009> Collabora Ltd
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
+ * Copyright (C) <2009> Nokia Inc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_SHM_SRC_H__
+#define __GST_SHM_SRC_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
+#include <gst/base/gstbasesrc.h>
+
+#include "shmpipe.h"
+
+G_BEGIN_DECLS
+#define GST_TYPE_SHM_SRC \
+ (gst_shm_src_get_type())
+#define GST_SHM_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SRC,GstShmSrc))
+#define GST_SHM_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SRC,GstShmSrcClass))
+#define GST_IS_SHM_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SRC))
+#define GST_IS_SHM_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SRC))
+typedef struct _GstShmSrc GstShmSrc;
+typedef struct _GstShmSrcClass GstShmSrcClass;
+typedef struct _GstShmPipe GstShmPipe;
+
+struct _GstShmSrc
+{
+ GstPushSrc element;
+
+ gchar *socket_path;
+
+ GstShmPipe *pipe;
+ GstPoll *poll;
+ GstPollFD pollfd;
+
+
+ GstFlowReturn flow_return;
+ gboolean unlocked;
+};
+
+struct _GstShmSrcClass
+{
+ GstPushSrcClass parent_class;
+};
+
+GType gst_shm_src_get_type (void);
+
+struct _GstShmPipe {
+ int use_count;
+
+ GstShmSrc *src;
+ ShmPipe *pipe;
+};
+
+G_END_DECLS
+#endif /* __GST_SHM_SRC_H__ */
diff --git a/sys/shm/shmalloc.c b/sys/shm/shmalloc.c
new file mode 100644
index 000000000..75acfbcb2
--- /dev/null
+++ b/sys/shm/shmalloc.c
@@ -0,0 +1,153 @@
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "shmalloc.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+
+struct _ShmAllocSpace
+{
+ size_t size;
+
+ ShmAllocBlock *blocks;
+};
+
+struct _ShmAllocBlock
+{
+ int use_count;
+
+ ShmAllocSpace *space;
+
+ unsigned long offset;
+ unsigned long size;
+
+ ShmAllocBlock *next;
+};
+
+
+ShmAllocSpace *
+shm_alloc_space_new (size_t size)
+{
+ ShmAllocSpace *self = spalloc_new (ShmAllocSpace);
+
+ memset (self, 0, sizeof (ShmAllocSpace));
+
+ self->size = size;
+
+ return self;
+}
+
+void
+shm_alloc_space_free (ShmAllocSpace * self)
+{
+ assert (self && self->blocks == NULL);
+ spalloc_free (ShmAllocSpace, self);
+}
+
+
+ShmAllocBlock *
+shm_alloc_space_alloc_block (ShmAllocSpace * self, unsigned long size)
+{
+ ShmAllocBlock *block;
+ ShmAllocBlock *item = NULL;
+ ShmAllocBlock *prev_item = NULL;
+ unsigned long prev_end_offset = 0;
+
+
+ for (item = self->blocks; item; item = item->next) {
+ unsigned long max_size = 0;
+
+ max_size = item->offset - prev_end_offset;
+
+ if (max_size >= size)
+ break;
+
+ prev_end_offset = item->offset + item->size;
+ prev_item = item;
+ }
+
+ /* Did not find space before an existing block */
+ if (self->blocks && !item) {
+ /* Return NULL if there is no big enough space, otherwise, there is space
+ * at the end */
+ if (self->size - prev_end_offset < size)
+ return NULL;
+ }
+
+ block = spalloc_new (ShmAllocBlock);
+ memset (block, 0, sizeof (ShmAllocBlock));
+ block->offset = prev_end_offset;
+ block->size = size;
+ block->use_count = 1;
+ block->space = self;
+
+ if (prev_item)
+ prev_item->next = block;
+ else
+ self->blocks = block;
+
+ block->next = item;
+
+ return block;
+}
+
+unsigned long
+shm_alloc_space_alloc_block_get_offset (ShmAllocBlock * block)
+{
+ return block->offset;
+}
+
+static void
+shm_alloc_space_free_block (ShmAllocBlock * block)
+{
+ ShmAllocBlock *item = NULL;
+ ShmAllocBlock *prev_item = NULL;
+ ShmAllocSpace *self = block->space;
+
+ for (item = self->blocks; item; item = item->next) {
+ if (item == block) {
+ if (prev_item)
+ prev_item->next = item->next;
+ else
+ self->blocks = item->next;
+ break;
+ }
+ prev_item = item;
+ }
+
+ spalloc_free (ShmAllocBlock, block);
+}
+
+ShmAllocBlock *
+shm_alloc_space_block_get (ShmAllocSpace * self, unsigned long offset)
+{
+ ShmAllocBlock *block = NULL;
+
+ for (block = self->blocks; block; block = block->next) {
+ if (block->offset <= offset && (block->offset + block->size) > offset)
+ return block;
+ }
+
+ return NULL;
+}
+
+
+void
+shm_alloc_space_block_inc (ShmAllocBlock * block)
+{
+ block->use_count++;
+}
+
+void
+shm_alloc_space_block_dec (ShmAllocBlock * block)
+{
+ block->use_count--;
+
+ if (block->use_count <= 0)
+ shm_alloc_space_free_block (block);
+}
diff --git a/sys/shm/shmalloc.h b/sys/shm/shmalloc.h
new file mode 100644
index 000000000..6a0609ddb
--- /dev/null
+++ b/sys/shm/shmalloc.h
@@ -0,0 +1,47 @@
+
+#include <stdlib.h>
+
+#ifndef __SHMALLOC_H__
+#define __SHMALLOC_H__
+
+#ifdef GST_PACKAGE_NAME
+#include <glib.h>
+
+#define spalloc_new(type) g_slice_new (type)
+#define spalloc_alloc(size) g_slice_alloc (size)
+
+#define spalloc_free(type, buf) g_slice_free (type, buf)
+#define spalloc_free1(size, buf) g_slice_free1 (size, buf)
+
+#else
+
+#define spalloc_new(type) malloc (sizeof (type))
+#define spalloc_alloc(size) malloc (size)
+
+#define spalloc_free(type, buf) free (buf)
+#define spalloc_free1(size, buf) free (buf)
+
+#endif
+
+typedef struct _ShmAllocSpace ShmAllocSpace;
+typedef struct _ShmAllocBlock ShmAllocBlock;
+
+ShmAllocSpace *shm_alloc_space_new (size_t size);
+void shm_alloc_space_free (ShmAllocSpace * self);
+
+
+ShmAllocBlock *shm_alloc_space_alloc_block (ShmAllocSpace * self,
+ unsigned long size);
+unsigned long shm_alloc_space_alloc_block_get_offset (ShmAllocBlock *block);
+
+void shm_alloc_space_block_inc (ShmAllocBlock * block);
+void shm_alloc_space_block_dec (ShmAllocBlock * block);
+ShmAllocBlock * shm_alloc_space_block_get (ShmAllocSpace * space,
+ unsigned long offset);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __SHMALLOC_H__ */
diff --git a/sys/shm/shmpipe.c b/sys/shm/shmpipe.c
new file mode 100644
index 000000000..53fc6df8a
--- /dev/null
+++ b/sys/shm/shmpipe.c
@@ -0,0 +1,831 @@
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "shmpipe.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <sys/mman.h>
+#include <assert.h>
+
+/*
+ * The protocol over the pipe is in packets
+ *
+ * The defined types are:
+ * type 1: new shm area
+ * Area length
+ * Size of path (followed by path)
+ *
+ * type 2: Close shm area:
+ * No payload
+ *
+ * type 3: shm buffer
+ * offset
+ * bufsize
+ *
+ * type 4: ack buffer
+ * offset
+ *
+ * Type 4 goes from the client to the server
+ * The rest are from the server to the client
+ * The client should never write in the SHM
+ */
+
+
+#include "shmalloc.h"
+
+enum
+{
+ COMMAND_NEW_SHM_AREA = 1,
+ COMMAND_CLOSE_SHM_AREA = 2,
+ COMMAND_NEW_BUFFER = 3,
+ COMMAND_ACK_BUFFER = 4
+};
+
+typedef struct _ShmArea ShmArea;
+typedef struct _ShmBuffer ShmBuffer;
+
+struct _ShmArea
+{
+ int id;
+
+ int use_count;
+
+ int shm_fd;
+
+ char *shm_area;
+ size_t shm_area_len;
+
+ char *shm_area_name;
+
+ ShmAllocSpace *allocspace;
+
+ ShmArea *next;
+};
+
+struct _ShmBuffer
+{
+ int use_count;
+
+ ShmArea *shm_area;
+ unsigned long offset;
+ size_t size;
+
+ ShmAllocBlock *block;
+
+ ShmBuffer *next;
+
+ int num_clients;
+ int clients[0];
+};
+
+
+struct _ShmPipe
+{
+ int main_socket;
+ char *socket_path;
+
+ ShmArea *shm_area;
+
+ int next_area_id;
+
+ ShmBuffer *buffers;
+
+ int num_clients;
+ ShmClient *clients;
+
+ mode_t perms;
+};
+
+struct _ShmClient
+{
+ int fd;
+
+ ShmClient *next;
+};
+
+struct _ShmBlock
+{
+ ShmPipe *pipe;
+ ShmArea *area;
+ ShmAllocBlock *ablock;
+};
+
+struct CommandBuffer
+{
+ unsigned int type;
+ int area_id;
+
+ union
+ {
+ struct
+ {
+ size_t size;
+ unsigned int path_size;
+ /* Followed by path */
+ } new_shm_area;
+ struct
+ {
+ unsigned long offset;
+ unsigned long size;
+ } buffer;
+ struct
+ {
+ unsigned long offset;
+ } ack_buffer;
+ } payload;
+};
+
+static ShmArea *sp_open_shm (char *path, int id, int writer, mode_t perms,
+ size_t size);
+static void sp_close_shm (ShmPipe * self, ShmArea * area);
+static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
+ ShmBuffer * prev_buf);
+static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
+
+
+
+#define RETURN_ERROR(format, ...) do { \
+ fprintf (stderr, format, __VA_ARGS__); \
+ sp_close (self); \
+ return NULL; } while (0)
+
+ShmPipe *
+sp_writer_create (const char *path, size_t size, mode_t perms)
+{
+ ShmPipe *self = spalloc_new (ShmPipe);
+ int flags;
+ struct sockaddr_un sun;
+ int i = 0;
+
+ memset (self, 0, sizeof (ShmPipe));
+
+ self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
+
+ if (self->main_socket < 0) {
+ RETURN_ERROR ("Could not create socket (%d): %s\n", errno,
+ strerror (errno));
+ }
+
+ flags = fcntl (self->main_socket, F_GETFL, 0);
+ if (flags < 0) {
+ RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno));
+ }
+
+ if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0) {
+ RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
+ }
+
+ sun.sun_family = AF_UNIX;
+ strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1);
+
+ while (bind (self->main_socket, (struct sockaddr *) &sun,
+ sizeof (struct sockaddr_un)) < 0) {
+ if (errno != EADDRINUSE)
+ RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
+
+ if (i > 256)
+ RETURN_ERROR ("Could not find a free socket name for %s", path);
+
+ snprintf (sun.sun_path, sizeof (sun.sun_path), "%s.%d", path, i);
+ i++;
+ }
+
+ self->socket_path = strdup (sun.sun_path);
+
+ if (listen (self->main_socket, 10) < 0) {
+ RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
+ }
+
+ self->shm_area = sp_open_shm (NULL, ++self->next_area_id, 1, perms, size);
+
+ self->perms = perms;
+
+ if (!self->shm_area) {
+ sp_close (self);
+ return NULL;
+ }
+
+ return self;
+}
+
+#undef RETURN_ERROR
+
+#define RETURN_ERROR(format, ...) \
+ fprintf (stderr, format, __VA_ARGS__); \
+ sp_shm_area_dec (NULL, area); \
+ return NULL;
+
+static ShmArea *
+sp_open_shm (char *path, int id, int writer, mode_t perms, size_t size)
+{
+ ShmArea *area = spalloc_new (ShmArea);
+ char tmppath[PATH_MAX];
+ int flags;
+ int prot;
+ int i = 0;
+
+ memset (area, 0, sizeof (ShmArea));
+
+ area->use_count = 1;
+
+ area->shm_area_len = size;
+
+
+ if (writer)
+ flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL;
+ else
+ flags = O_RDONLY;
+
+ area->shm_fd = -1;
+
+ if (path) {
+ area->shm_fd = shm_open (path, flags, perms);
+ } else {
+ do {
+ snprintf (tmppath, PATH_MAX, "/shmpipe.5%d.%5d", getpid (), i++);
+ area->shm_fd = shm_open (tmppath, flags, perms);
+ } while (area->shm_fd < 0 && errno == EEXIST);
+ }
+
+ if (area->shm_fd < 0) {
+ RETURN_ERROR ("shm_open failed on %s (%d): %s\n",
+ path ? path : tmppath, errno, strerror (errno));
+ }
+
+ if (!path)
+ area->shm_area_name = strdup (tmppath);
+
+ if (writer) {
+ if (ftruncate (area->shm_fd, size)) {
+ RETURN_ERROR ("Could not resize memory area to header size,"
+ " ftruncate failed (%d): %s\n", errno, strerror (errno));
+ }
+ }
+
+ if (writer)
+ prot = PROT_READ | PROT_WRITE;
+ else
+ prot = PROT_READ;
+
+ area->shm_area = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0);
+
+ if (area->shm_area == MAP_FAILED) {
+ RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno));
+ }
+
+ area->id = id;
+
+ if (writer)
+ area->allocspace = shm_alloc_space_new (area->shm_area_len);
+
+ return area;
+}
+
+#undef RETURN_ERROR
+
+static void
+sp_close_shm (ShmPipe * self, ShmArea * area)
+{
+ assert (area->use_count == 0);
+
+ if (area->allocspace)
+ shm_alloc_space_free (area->allocspace);
+
+ if (self != NULL) {
+ ShmArea *item = NULL;
+ ShmArea *prev_item = NULL;
+
+ for (item = self->shm_area; item; item = item->next) {
+ if (item == area) {
+ if (prev_item)
+ prev_item->next = item->next;
+ else
+ self->shm_area = item->next;
+ break;
+ }
+ prev_item = item;
+ }
+ assert (item);
+ }
+
+ if (area->shm_area != MAP_FAILED)
+ munmap (area->shm_area, area->shm_area_len);
+
+ if (area->shm_fd >= 0)
+ close (area->shm_fd);
+
+ if (area->shm_area_name) {
+ shm_unlink (area->shm_area_name);
+ free (area->shm_area_name);
+ }
+
+ spalloc_free (ShmArea, area);
+}
+
+static void
+sp_shm_area_inc (ShmArea * area)
+{
+ area->use_count++;
+}
+
+static void
+sp_shm_area_dec (ShmPipe * self, ShmArea * area)
+{
+ assert (area->use_count > 0);
+ area->use_count--;
+
+ if (area->use_count == 0) {
+ sp_close_shm (self, area);
+ }
+}
+
+void
+sp_close (ShmPipe * self)
+{
+ if (self->main_socket >= 0)
+ close (self->main_socket);
+
+ if (self->socket_path) {
+ unlink (self->socket_path);
+ free (self->socket_path);
+ }
+
+ while (self->clients)
+ sp_writer_close_client (self, self->clients);
+
+ while (self->shm_area) {
+ sp_shm_area_dec (self, self->shm_area);
+ }
+
+ spalloc_free (ShmPipe, self);
+}
+
+int
+sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
+{
+ self->perms = perms;
+ return fchmod (self->shm_area->shm_fd, perms);
+}
+
+static int
+send_command (int fd, struct CommandBuffer *cb, unsigned short int type,
+ int area_id)
+{
+ cb->type = type;
+ cb->area_id = area_id;
+
+ if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) !=
+ sizeof (struct CommandBuffer))
+ return 0;
+
+ return 1;
+}
+
+int
+sp_writer_resize (ShmPipe * self, size_t size)
+{
+ ShmArea *newarea;
+ ShmArea *old_current;
+ ShmClient *client;
+ int c = 0;
+ int pathlen;
+
+ if (self->shm_area->shm_area_len == size)
+ return 0;
+
+ newarea = sp_open_shm (NULL, ++self->next_area_id, 1, self->perms, size);
+
+ if (!newarea)
+ return -1;
+
+ old_current = self->shm_area;
+ newarea->next = self->shm_area;
+ self->shm_area = newarea;
+
+ pathlen = strlen (newarea->shm_area_name) + 1;
+
+ for (client = self->clients; client; client = client->next) {
+ struct CommandBuffer cb = { 0 };
+
+ if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA,
+ old_current->id))
+ continue;
+
+ cb.payload.new_shm_area.size = newarea->shm_area_len;
+ cb.payload.new_shm_area.path_size = pathlen;
+ if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id))
+ continue;
+
+ if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) !=
+ pathlen)
+ continue;
+ c++;
+ }
+
+ sp_shm_area_dec (self, old_current);
+
+
+ return c;
+}
+
+ShmBlock *
+sp_writer_alloc_block (ShmPipe * self, size_t size)
+{
+ ShmBlock *block;
+ ShmAllocBlock *ablock =
+ shm_alloc_space_alloc_block (self->shm_area->allocspace, size);
+
+ if (!ablock)
+ return NULL;
+
+ block = spalloc_new (ShmBlock);
+ sp_shm_area_inc (self->shm_area);
+ block->pipe = self;
+ block->area = self->shm_area;
+ block->ablock = ablock;
+ return block;
+}
+
+char *
+sp_writer_block_get_buf (ShmBlock * block)
+{
+ return block->area->shm_area +
+ shm_alloc_space_alloc_block_get_offset (block->ablock);
+}
+
+void
+sp_writer_free_block (ShmBlock * block)
+{
+ shm_alloc_space_block_dec (block->ablock);
+ sp_shm_area_dec (block->pipe, block->area);
+ spalloc_free (ShmBlock, block);
+}
+
+/* Returns the number of client this has successfully been sent to */
+
+int
+sp_writer_send_buf (ShmPipe * self, char *buf, size_t size)
+{
+ ShmArea *area = NULL;
+ unsigned long offset = 0;
+ unsigned long bsize = size;
+ ShmBuffer *sb;
+ ShmClient *client = NULL;
+ ShmAllocBlock *block = NULL;
+ int i = 0;
+ int c = 0;
+
+ if (self->num_clients == 0)
+ return 0;
+
+ for (area = self->shm_area; area; area = area->next) {
+ if (buf >= area->shm_area && buf < (area->shm_area + area->shm_area_len)) {
+ offset = buf - area->shm_area;
+ block = shm_alloc_space_block_get (area->allocspace, offset);
+ assert (block);
+ break;
+ }
+ }
+
+ if (!block)
+ return -1;
+
+ sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
+ memset (sb, 0, sizeof (ShmBuffer));
+ memset (sb->clients, -1, sizeof (int) * self->num_clients);
+ sb->shm_area = area;
+ sb->offset = offset;
+ sb->size = size;
+ sb->num_clients = self->num_clients;
+ sb->block = block;
+
+ for (client = self->clients; client; client = client->next) {
+ struct CommandBuffer cb = { 0 };
+ cb.payload.buffer.offset = offset;
+ cb.payload.buffer.size = bsize;
+ if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
+ continue;
+ sb->clients[i++] = client->fd;
+ c++;
+ }
+
+ if (c == 0) {
+ spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * self->num_clients, sb);
+ return 0;
+ }
+
+ sp_shm_area_inc (area);
+ shm_alloc_space_block_inc (block);
+
+ sb->use_count = c;
+
+ sb->next = self->buffers;
+ self->buffers = sb;
+
+ return c;
+}
+
+static int
+recv_command (int fd, struct CommandBuffer *cb)
+{
+ int retval;
+
+ retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT);
+ if (retval == sizeof (struct CommandBuffer)) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+unsigned long
+sp_client_recv (ShmPipe * self, char **buf)
+{
+ char *area_name = NULL;
+ ShmArea *newarea, *oldarea;
+ ShmArea *area;
+ struct CommandBuffer cb;
+ int retval;
+
+ if (!recv_command (self->main_socket, &cb))
+ return -1;
+
+ switch (cb.type) {
+ case COMMAND_NEW_SHM_AREA:
+ assert (cb.payload.new_shm_area.path_size > 0);
+ assert (cb.payload.new_shm_area.size > 0);
+
+ area_name = malloc (cb.payload.new_shm_area.path_size);
+ retval = recv (self->main_socket, area_name,
+ cb.payload.new_shm_area.path_size, 0);
+ if (retval != cb.payload.new_shm_area.path_size) {
+ free (area_name);
+ return -3;
+ }
+
+ newarea = sp_open_shm (area_name, cb.area_id, 0, 0,
+ cb.payload.new_shm_area.size);
+ free (area_name);
+ if (!newarea)
+ return -4;
+
+ oldarea = self->shm_area;
+ newarea->next = self->shm_area;
+ self->shm_area = newarea;
+ /*
+ if (oldarea)
+ sp_shm_area_dec (self, oldarea);
+ */
+ break;
+
+ case COMMAND_CLOSE_SHM_AREA:
+ for (area = self->shm_area; area; area = area->next) {
+ if (area->id == cb.area_id) {
+ sp_shm_area_dec (self, area);
+ break;
+ }
+ }
+ break;
+
+ case COMMAND_NEW_BUFFER:
+ assert (buf);
+ for (area = self->shm_area; area; area = area->next) {
+ if (area->id == cb.area_id) {
+ *buf = area->shm_area + cb.payload.buffer.offset;
+ sp_shm_area_inc (area);
+ return cb.payload.buffer.size;
+ }
+ }
+ return -23;
+
+ default:
+ return -99;
+ }
+
+ return 0;
+}
+
+int
+sp_writer_recv (ShmPipe * self, ShmClient * client)
+{
+ ShmBuffer *buf = NULL, *prev_buf = NULL;
+ struct CommandBuffer cb;
+
+ if (!recv_command (client->fd, &cb))
+ return -1;
+
+ switch (cb.type) {
+ case COMMAND_ACK_BUFFER:
+
+ for (buf = self->buffers; buf; buf = buf->next) {
+ if (buf->shm_area->id == cb.area_id &&
+ buf->offset == cb.payload.ack_buffer.offset) {
+ sp_shmbuf_dec (self, buf, prev_buf);
+ break;
+ }
+ prev_buf = buf;
+ }
+
+ if (!buf)
+ return -2;
+
+ break;
+ default:
+ return -99;
+ }
+
+ return 0;
+}
+
+int
+sp_client_recv_finish (ShmPipe * self, char *buf)
+{
+ ShmArea *shm_area = NULL;
+ unsigned long offset;
+ struct CommandBuffer cb = { 0 };
+
+ for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) {
+ if (buf >= shm_area->shm_area &&
+ buf < shm_area->shm_area + shm_area->shm_area_len)
+ break;
+ }
+
+ assert (shm_area);
+
+ offset = buf - shm_area->shm_area;
+
+ sp_shm_area_dec (self, shm_area);
+
+ cb.payload.ack_buffer.offset = offset;
+ return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER,
+ self->shm_area->id);
+}
+
+ShmPipe *
+sp_client_open (const char *path)
+{
+ ShmPipe *self = spalloc_new (ShmPipe);
+ struct sockaddr_un sun;
+
+ memset (self, 0, sizeof (ShmPipe));
+
+ self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
+ if (self->main_socket < 0) {
+ sp_close (self);
+ return NULL;
+ }
+
+ sun.sun_family = AF_UNIX;
+ strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1);
+
+ if (connect (self->main_socket, (struct sockaddr *) &sun,
+ sizeof (struct sockaddr_un)) < 0)
+ goto error;
+
+ return self;
+
+error:
+ spalloc_free (ShmPipe, self);
+ return NULL;
+}
+
+
+ShmClient *
+sp_writer_accept_client (ShmPipe * self)
+{
+ ShmClient *client = NULL;
+ int fd;
+ struct CommandBuffer cb = { 0 };
+ int pathlen = strlen (self->shm_area->shm_area_name) + 1;
+
+
+ fd = accept (self->main_socket, NULL, NULL);
+
+ if (fd < 0) {
+ fprintf (stderr, "Could not client connection");
+ return NULL;
+ }
+
+ cb.payload.new_shm_area.size = self->shm_area->shm_area_len;
+ cb.payload.new_shm_area.path_size = pathlen;
+ if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) {
+ fprintf (stderr, "Sending new shm area failed: %s", strerror (errno));
+ goto error;
+ }
+
+ if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) !=
+ pathlen) {
+ fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno));
+ goto error;
+ }
+
+ client = spalloc_new (ShmClient);
+ client->fd = fd;
+
+ /* Prepend ot linked list */
+ client->next = self->clients;
+ self->clients = client;
+ self->num_clients++;
+
+ return client;
+
+error:
+ close (fd);
+ return NULL;
+}
+
+static int
+sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf)
+{
+ buf->use_count--;
+
+ if (buf->use_count == 0) {
+ /* Remove from linked list */
+ if (prev_buf)
+ prev_buf->next = buf->next;
+ else
+ self->buffers = buf->next;
+
+ shm_alloc_space_block_dec (buf->block);
+ sp_shm_area_dec (self, buf->shm_area);
+ spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
+ return 0;
+ }
+
+ return 1;
+}
+
+void
+sp_writer_close_client (ShmPipe * self, ShmClient * client)
+{
+ ShmBuffer *buffer = NULL, *prev_buf = NULL;
+ ShmClient *item = NULL, *prev_item = NULL;
+
+ close (client->fd);
+
+again:
+ for (buffer = self->buffers; buffer; buffer = buffer->next) {
+ int i;
+
+ for (i = 0; i < buffer->num_clients; i++) {
+ if (buffer->clients[i] == client->fd) {
+ buffer->clients[i] = -1;
+ if (!sp_shmbuf_dec (self, buffer, prev_buf))
+ goto again;
+ break;
+ }
+ prev_buf = buffer;
+ }
+ }
+
+ for (item = self->clients; item; item = item->next) {
+ if (item == client)
+ break;
+ prev_item = item;
+ }
+ assert (item);
+
+ if (prev_item)
+ prev_item->next = client->next;
+ else
+ self->clients = client->next;
+
+ self->num_clients--;
+
+ spalloc_free (ShmClient, client);
+}
+
+int
+sp_get_fd (ShmPipe * self)
+{
+ return self->main_socket;
+}
+
+int
+sp_writer_get_client_fd (ShmClient * client)
+{
+ return client->fd;
+}
+
+int
+sp_writer_pending_writes (ShmPipe * self)
+{
+ return (self->buffers != NULL);
+}
+
+const char *
+sp_writer_get_path (ShmPipe * pipe)
+{
+ return pipe->socket_path;
+}
diff --git a/sys/shm/shmpipe.h b/sys/shm/shmpipe.h
new file mode 100644
index 000000000..f3657b676
--- /dev/null
+++ b/sys/shm/shmpipe.h
@@ -0,0 +1,78 @@
+/*
+ *
+ * First, create a writer with sp_writer_create()
+ * And selectes() on the socket from sp_get_fd()
+ * If the socket is closed or there are errors from any function, the app
+ * should call sp_close() and assume the writer is dead
+ * The server calls sp_writer_accept_client() when there is something to read
+ * from the server fd
+ * It then needs to select() on the socket from sp_writer_get_client_fd()
+ * If it gets an error on that socket, it call sp_writer_close_client().
+ * If there is something to read, it calls sp_writer_recv().
+ *
+ * The writer allocates buffers with sp_writer_alloc_block(),
+ * writes something in the buffer (retrieved with sp_writer_block_get_buf(),
+ * then calls sp_writer_send_buf() to send the buffer or a subsection to
+ * the other side. When it is done with the block, it calls
+ * sp_writer_free_block().
+ * If alloc fails, then the server must wait for events from the clients before
+ * trying again.
+ *
+ *
+ * The clients connect with sp_client_open()
+ * And select() on the fd from sp_get_fd() until there is something to read.
+ * Then they must read using sp_client_recv() which will return > 0 if there
+ * is a valid buffer (which is read only). It will return 0 if it is an internal
+ * message and <0 if there was an error. If there was an error, one must close
+ * it with sp_close(). If was valid buffer was received, the client must release
+ * it with sp_client_recv_finish() when it is done reading from it.
+ */
+
+
+#ifndef __SHMPIPE_H__
+#define __SHMPIPE_H__
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _ShmClient ShmClient;
+typedef struct _ShmPipe ShmPipe;
+typedef struct _ShmBlock ShmBlock;
+
+ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms);
+const char *sp_writer_get_path (ShmPipe *pipe);
+void sp_close (ShmPipe * self);
+
+int sp_writer_setperms_shm (ShmPipe * self, mode_t perms);
+int sp_writer_resize (ShmPipe * self, size_t size);
+
+int sp_get_fd (ShmPipe * self);
+int sp_writer_get_client_fd (ShmClient * client);
+
+ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
+void sp_writer_free_block (ShmBlock *block);
+int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size);
+char *sp_writer_block_get_buf (ShmBlock *block);
+
+ShmClient * sp_writer_accept_client (ShmPipe * self);
+void sp_writer_close_client (ShmPipe *self, ShmClient * client);
+int sp_writer_recv (ShmPipe * self, ShmClient * client);
+
+int sp_writer_pending_writes (ShmPipe * self);
+
+ShmPipe *sp_client_open (const char *path);
+unsigned long sp_client_recv (ShmPipe * self, char **buf);
+int sp_client_recv_finish (ShmPipe * self, char *buf);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __SHMPIPE_H__ */