diff options
author | Olivier CrĂȘte <olivier.crete@collabora.co.uk> | 2010-06-03 14:42:06 -0400 |
---|---|---|
committer | Olivier CrĂȘte <olivier.crete@collabora.co.uk> | 2010-06-03 15:27:43 -0400 |
commit | 21686e30779b225e903fb7a6e90e9c5a81c7c9d0 (patch) | |
tree | c51039c8d942a59f72cebafffde6f6dffffc8d10 /sys/shm | |
parent | 536e79befd17d62aba4eaa6308fef6128dd5fa09 (diff) |
shm: Move to sys/ since it doesn't exist on windows
Diffstat (limited to 'sys/shm')
-rw-r--r-- | sys/shm/Makefile.am | 13 | ||||
-rw-r--r-- | sys/shm/gstshm.c | 42 | ||||
-rw-r--r-- | sys/shm/gstshmsink.c | 602 | ||||
-rw-r--r-- | sys/shm/gstshmsink.h | 76 | ||||
-rw-r--r-- | sys/shm/gstshmsrc.c | 386 | ||||
-rw-r--r-- | sys/shm/gstshmsrc.h | 76 | ||||
-rw-r--r-- | sys/shm/shmalloc.c | 153 | ||||
-rw-r--r-- | sys/shm/shmalloc.h | 47 | ||||
-rw-r--r-- | sys/shm/shmpipe.c | 831 | ||||
-rw-r--r-- | sys/shm/shmpipe.h | 78 |
10 files changed, 2304 insertions, 0 deletions
diff --git a/sys/shm/Makefile.am b/sys/shm/Makefile.am new file mode 100644 index 000000000..fdc034b71 --- /dev/null +++ b/sys/shm/Makefile.am @@ -0,0 +1,13 @@ +glib_enum_prefix = gst_shm + +include $(top_srcdir)/common/glib-gen.mak + +plugin_LTLIBRARIES = libgstshm.la + +libgstshm_la_SOURCES = shmpipe.c shmalloc.c gstshm.c gstshmsrc.c gstshmsink.c +libgstshm_la_CFLAGS = $(GST_CFLAGS) +libgstshm_la_LIBADD = -lrt +libgstshm_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) +libgstshm_la_LIBTOOLFLAGS = --tag=disable-static + +noinst_HEADERS = gstshmsrc.h gstshmsink.h shmpipe.h shmalloc.h diff --git a/sys/shm/gstshm.c b/sys/shm/gstshm.c new file mode 100644 index 000000000..d57db6755 --- /dev/null +++ b/sys/shm/gstshm.c @@ -0,0 +1,42 @@ +/* GStreamer + * Copyright (C) <2009> Collabora Ltd + * @author: Olivier Crete <olivier.crete@collabora.co.uk + * Copyright (C) <2009> Nokia Inc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstshmsrc.h" +#include "gstshmsink.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "shmsrc", + GST_RANK_NONE, GST_TYPE_SHM_SRC) && + gst_element_register (plugin, "shmsink", + GST_RANK_NONE, GST_TYPE_SHM_SINK); +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "shm", + "shared memory sink source", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/sys/shm/gstshmsink.c b/sys/shm/gstshmsink.c new file mode 100644 index 000000000..ba8608fa9 --- /dev/null +++ b/sys/shm/gstshmsink.c @@ -0,0 +1,602 @@ +/* GStreamer + * Copyright (C) <2009> Collabora Ltd + * @author: Olivier Crete <olivier.crete@collabora.co.uk + * Copyright (C) <2009> Nokia Inc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstshmsink.h" + +#include <gst/gst.h> + +#include <string.h> + +/* signals */ +enum +{ + SIGNAL_CLIENT_CONNECTED, + SIGNAL_CLIENT_DISCONNECTED, + LAST_SIGNAL +}; + +/* properties */ +enum +{ + PROP_0, + PROP_SOCKET_PATH, + PROP_PERMS, + PROP_SHM_SIZE, + PROP_WAIT_FOR_CONNECTION +}; + +struct GstShmClient +{ + ShmClient *client; + GstPollFD pollfd; +}; + +#define DEFAULT_SIZE ( 256 * 1024 ) +#define DEFAULT_WAIT_FOR_CONNECTION (TRUE) +#define DEFAULT_PERMS (S_IRWXU | S_IRWXG) + + +GST_DEBUG_CATEGORY_STATIC (shmsink_debug); +#define GST_CAT_DEFAULT shmsink_debug + +static const GstElementDetails gst_shm_sink_details = +GST_ELEMENT_DETAILS ("Shared Memory Sink", + "Sink", + "Send data over shared memory to the matching source", + "Olivier Crete <olivier.crete@collabora.co.uk>"); + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_BOILERPLATE (GstShmSink, gst_shm_sink, GstBaseSink, GST_TYPE_BASE_SINK); + +static void gst_shm_sink_finalize (GObject * object); +static void gst_shm_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_shm_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static gboolean gst_shm_sink_start (GstBaseSink * bsink); +static gboolean gst_shm_sink_stop (GstBaseSink * bsink); +static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf); +static GstFlowReturn gst_shm_sink_buffer_alloc (GstBaseSink * sink, + guint64 offset, guint size, GstCaps * caps, GstBuffer ** out_buf); + +static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event); +static gboolean gst_shm_sink_unlock (GstBaseSink * bsink); +static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink); + +static gpointer pollthread_func (gpointer data); + +static guint signals[LAST_SIGNAL] = { 0 }; + +static void +gst_shm_sink_base_init (gpointer g_class) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sinktemplate)); + + gst_element_class_set_details (element_class, &gst_shm_sink_details); +} + + +static void +gst_shm_sink_init (GstShmSink * self, GstShmSinkClass * g_class) +{ + self->cond = g_cond_new (); + self->size = DEFAULT_SIZE; + self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION; + self->perms = DEFAULT_PERMS; +} + +static void +gst_shm_sink_class_init (GstShmSinkClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + gobject_class->finalize = gst_shm_sink_finalize; + gobject_class->set_property = gst_shm_sink_set_property; + gobject_class->get_property = gst_shm_sink_get_property; + + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop); + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render); + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock); + gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop); + gstbasesink_class->buffer_alloc = + GST_DEBUG_FUNCPTR (gst_shm_sink_buffer_alloc); + + g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, + g_param_spec_string ("socket-path", + "Path to the control socket", + "The path to the control socket used to control the shared memory" + " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_PERMS, + g_param_spec_uint ("perms", + "Permissions on the shm area", + "Permissions to set on the shm area", + 0, 07777, DEFAULT_PERMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_SHM_SIZE, + g_param_spec_uint ("shm-size", + "Size of the shm area", + "Size of the shared memory area", + 0, G_MAXUINT, DEFAULT_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION, + g_param_spec_boolean ("wait-for-connection", + "Wait for a connection until rendering", + "Block the stream until the shm pipe is connected", + DEFAULT_WAIT_FOR_CONNECTION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected", + GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, + g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + + signals[SIGNAL_CLIENT_DISCONNECTED] = g_signal_new ("client-disconnected", + GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, + g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + + GST_DEBUG_CATEGORY_INIT (shmsink_debug, "shmsink", 0, "Shared Memory Sink"); +} + +static void +gst_shm_sink_finalize (GObject * object) +{ + GstShmSink *self = GST_SHM_SINK (object); + + g_cond_free (self->cond); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +/* + * Set the value of a property for the server sink. + */ +static void +gst_shm_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstShmSink *self = GST_SHM_SINK (object); + int ret = 0; + + switch (prop_id) { + case PROP_SOCKET_PATH: + GST_OBJECT_LOCK (object); + g_free (self->socket_path); + self->socket_path = g_value_dup_string (value); + GST_OBJECT_UNLOCK (object); + break; + case PROP_PERMS: + GST_OBJECT_LOCK (object); + self->perms = g_value_get_uint (value); + if (self->pipe) + ret = sp_writer_setperms_shm (self->pipe, self->perms); + GST_OBJECT_UNLOCK (object); + if (ret < 0) + GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s", + strerror (ret)); + break; + case PROP_SHM_SIZE: + GST_OBJECT_LOCK (object); + if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) + GST_DEBUG_OBJECT (self, + "Resize shared memory area from %u to %u bytes"); + else + GST_WARNING_OBJECT (self, + "Could not resize shared memory area from %u to %u bytes"); + self->size = g_value_get_uint (value); + GST_OBJECT_UNLOCK (object); + break; + case PROP_WAIT_FOR_CONNECTION: + GST_OBJECT_LOCK (object); + self->wait_for_connection = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (object); + g_cond_broadcast (self->cond); + break; + default: + break; + } +} + +static void +gst_shm_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstShmSink *self = GST_SHM_SINK (object); + + GST_OBJECT_LOCK (object); + + switch (prop_id) { + case PROP_SOCKET_PATH: + g_value_set_string (value, self->socket_path); + break; + case PROP_PERMS: + self->perms = g_value_get_uint (value); + if (self->pipe) { + int ret; + + ret = sp_writer_setperms_shm (self->pipe, self->perms); + if (ret < 0) + GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s", + strerror (ret)); + } + break; + case PROP_SHM_SIZE: + g_value_set_uint (value, self->size); + break; + case PROP_WAIT_FOR_CONNECTION: + g_value_set_boolean (value, self->wait_for_connection); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_OBJECT_UNLOCK (object); +} + + + +static gboolean +gst_shm_sink_start (GstBaseSink * bsink) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + self->stop = FALSE; + + if (!self->socket_path) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, + ("Could not open socket."), (NULL)); + return FALSE; + } + + GST_DEBUG_OBJECT (self, "Creating new socket at %s" + " with shared memory of %d bytes", self->socket_path, self->size); + + self->pipe = sp_writer_create (self->socket_path, self->size, self->perms); + + if (!self->pipe) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, + ("Could not open socket."), (NULL)); + return FALSE; + } + + g_free (self->socket_path); + self->socket_path = g_strdup (sp_writer_get_path (self->pipe)); + + GST_DEBUG ("Created socket at %s", self->socket_path); + + self->poll = gst_poll_new (TRUE); + gst_poll_fd_init (&self->serverpollfd); + self->serverpollfd.fd = sp_get_fd (self->pipe); + gst_poll_add_fd (self->poll, &self->serverpollfd); + gst_poll_fd_ctl_read (self->poll, &self->serverpollfd, TRUE); + + self->pollthread = g_thread_create (pollthread_func, self, TRUE, NULL); + + if (!self->pollthread) + goto thread_error; + + return TRUE; + +thread_error: + + sp_close (self->pipe); + self->pipe = NULL; + gst_poll_free (self->poll); + + GST_ELEMENT_ERROR (self, CORE, THREAD, ("Could not srart thread"), (NULL)); + return FALSE; +} + + +static gboolean +gst_shm_sink_stop (GstBaseSink * bsink) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + self->stop = TRUE; + gst_poll_set_flushing (self->poll, TRUE); + + g_thread_join (self->pollthread); + self->pollthread = NULL; + + GST_DEBUG_OBJECT (self, "Stopping"); + + while (self->clients) { + struct GstShmClient *client = self->clients->data; + self->clients = g_list_remove (self->clients, client); + sp_writer_close_client (self->pipe, client->client); + g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, + client->pollfd.fd); + g_slice_free (struct GstShmClient, client); + } + + gst_poll_free (self->poll); + self->poll = NULL; + + sp_close (self->pipe); + self->pipe = NULL; + + return TRUE; +} + +static GstFlowReturn +gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + int rv; + + GST_OBJECT_LOCK (self); + while (self->wait_for_connection && !self->clients) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } + + rv = sp_writer_send_buf (self->pipe, (char *) GST_BUFFER_DATA (buf), + GST_BUFFER_SIZE (buf)); + + if (rv == -1) { + ShmBlock *block = NULL; + gchar *shmbuf = NULL; + while ((block = sp_writer_alloc_block (self->pipe, + GST_BUFFER_SIZE (buf))) == NULL) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } + while (self->wait_for_connection && !self->clients) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + sp_writer_free_block (block); + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } + + + shmbuf = sp_writer_block_get_buf (block); + memcpy (shmbuf, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); + sp_writer_send_buf (self->pipe, shmbuf, GST_BUFFER_SIZE (buf)); + sp_writer_free_block (block); + } + + GST_OBJECT_UNLOCK (self); + + return GST_FLOW_OK; +} + +void +gst_shm_sink_free_buffer (gpointer data) +{ + ShmBlock *block = data; + sp_writer_free_block (block); +} + +static GstFlowReturn +gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** out_buf) +{ + GstShmSink *self = GST_SHM_SINK (sink); + GstBuffer *buffer; + ShmBlock *block = NULL; + gpointer buf = NULL; + + GST_OBJECT_LOCK (self); + block = sp_writer_alloc_block (self->pipe, size); + if (block) + buf = sp_writer_block_get_buf (block); + GST_OBJECT_UNLOCK (self); + + if (block) { + buffer = gst_buffer_new (); + GST_BUFFER_DATA (buffer) = buf; + GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block; + GST_BUFFER_FREE_FUNC (buffer) = + GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer); + GST_BUFFER_SIZE (buffer) = size; + GST_LOG_OBJECT (self, + "Allocated buffer of %lu bytes from shared memory at %p", size, buf); + } else { + buffer = gst_buffer_new_and_alloc (size); + GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %lu bytes, " + "allocating using standard allocator", size); + } + + GST_BUFFER_OFFSET (buffer) = offset; + gst_buffer_set_caps (buffer, caps); + + *out_buf = buffer; + + return GST_FLOW_OK; +} + +static gpointer +pollthread_func (gpointer data) +{ + GstShmSink *self = GST_SHM_SINK (data); + GList *item; + + while (!self->stop) { + + if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) + return NULL; + + if (self->stop) + return NULL; + + if (gst_poll_fd_has_closed (self->poll, &self->serverpollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed read from shmsink"), + ("Control socket has closed")); + return NULL; + } + + if (gst_poll_fd_has_error (self->poll, &self->serverpollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsink"), + ("Control socket has error")); + return NULL; + } + + if (gst_poll_fd_can_read (self->poll, &self->serverpollfd)) { + ShmClient *client; + struct GstShmClient *gclient; + + GST_OBJECT_LOCK (self); + client = sp_writer_accept_client (self->pipe); + GST_OBJECT_UNLOCK (self); + + if (!client) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, + ("Failed to read from shmsink"), + ("Control socket returns wrong data")); + return NULL; + } + + gclient = g_slice_new (struct GstShmClient); + gclient->client = client; + gst_poll_fd_init (&gclient->pollfd); + gclient->pollfd.fd = sp_writer_get_client_fd (client); + gst_poll_add_fd (self->poll, &gclient->pollfd); + gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE); + self->clients = g_list_prepend (self->clients, gclient); + g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0, + gclient->pollfd.fd); + } + + again: + for (item = self->clients; item; item = item->next) { + struct GstShmClient *gclient = item->data; + + if (gst_poll_fd_has_closed (self->poll, &gclient->pollfd)) { + GST_WARNING_OBJECT (self, "One client is gone, closing"); + goto close_client; + } + + if (gst_poll_fd_has_error (self->poll, &gclient->pollfd)) { + GST_WARNING_OBJECT (self, "One client fd has error, closing"); + goto close_client; + } + + if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) { + int rv; + + GST_OBJECT_LOCK (self); + rv = sp_writer_recv (self->pipe, gclient->client); + GST_OBJECT_UNLOCK (self); + + if (rv < 0) { + GST_WARNING_OBJECT (self, "One client has read error," + " closing (retval: %d errno: %d)", rv, errno); + goto close_client; + } + } + continue; + close_client: + GST_OBJECT_LOCK (self); + sp_writer_close_client (self->pipe, gclient->client); + GST_OBJECT_UNLOCK (self); + + gst_poll_remove_fd (self->poll, &gclient->pollfd); + self->clients = g_list_remove (self->clients, gclient); + + g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, + gclient->pollfd.fd); + g_slice_free (struct GstShmClient, gclient); + + goto again; + } + + g_cond_broadcast (self->cond); + } + + return NULL; +} + +static gboolean +gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_OBJECT_LOCK (self); + while (self->wait_for_connection && sp_writer_pending_writes (self->pipe) + && !self->unlock) + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + GST_OBJECT_UNLOCK (self); + break; + default: + break; + } + + return TRUE; +} + + +static gboolean +gst_shm_sink_unlock (GstBaseSink * bsink) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + GST_OBJECT_LOCK (self); + self->unlock = TRUE; + GST_OBJECT_UNLOCK (self); + + g_cond_broadcast (self->cond); + return TRUE; +} + +static gboolean +gst_shm_sink_unlock_stop (GstBaseSink * bsink) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + GST_OBJECT_LOCK (self); + self->unlock = FALSE; + GST_OBJECT_UNLOCK (self); + + return TRUE; +} diff --git a/sys/shm/gstshmsink.h b/sys/shm/gstshmsink.h new file mode 100644 index 000000000..bc6da74bc --- /dev/null +++ b/sys/shm/gstshmsink.h @@ -0,0 +1,76 @@ +/* GStreamer + * Copyright (C) <2009> Collabora Ltd + * @author: Olivier Crete <olivier.crete@collabora.co.uk + * Copyright (C) <2009> Nokia Inc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_SHM_SINK_H__ +#define __GST_SHM_SINK_H__ + +#include <gst/gst.h> +#include <gst/base/gstbasesink.h> + +#include "shmpipe.h" + +G_BEGIN_DECLS +#define GST_TYPE_SHM_SINK \ + (gst_shm_sink_get_type()) +#define GST_SHM_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK,GstShmSink)) +#define GST_SHM_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK,GstShmSinkClass)) +#define GST_IS_SHM_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK)) +#define GST_IS_SHM_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK)) +typedef struct _GstShmSink GstShmSink; +typedef struct _GstShmSinkClass GstShmSinkClass; + +struct _GstShmSink +{ + GstBaseSink element; + + gchar *socket_path; + + ShmPipe *pipe; + + guint perms; + guint size; + + GList *clients; + + GThread *pollthread; + GstPoll *poll; + GstPollFD serverpollfd; + + gboolean wait_for_connection; + gboolean stop; + gboolean unlock; + + GCond *cond; +}; + +struct _GstShmSinkClass +{ + GstBaseSinkClass parent_class; +}; + +GType gst_shm_sink_get_type (void); + +G_END_DECLS +#endif /* __GST_SHM_SINK_H__ */ diff --git a/sys/shm/gstshmsrc.c b/sys/shm/gstshmsrc.c new file mode 100644 index 000000000..93d2d10af --- /dev/null +++ b/sys/shm/gstshmsrc.c @@ -0,0 +1,386 @@ +/* GStreamer + * Copyright (C) <2009> Collabora Ltd + * @author: Olivier Crete <olivier.crete@collabora.co.uk + * Copyright (C) <2009> Nokia Inc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstshmsrc.h" + +#include <gst/gst.h> + +#include <string.h> + +/* signals */ +enum +{ + LAST_SIGNAL +}; + +/* properties */ +enum +{ + PROP_0, + PROP_SOCKET_PATH, + PROP_IS_LIVE +}; + +struct GstShmBuffer +{ + char *buf; + GstShmPipe *pipe; +}; + + +GST_DEBUG_CATEGORY_STATIC (shmsrc_debug); +#define GST_CAT_DEFAULT shmsrc_debug + +static const GstElementDetails gst_shm_src_details = +GST_ELEMENT_DETAILS ("Shared Memory Source", + "Source", + "Receive data from the sharem memory sink", + "Olivier Crete <olivier.crete@collabora.co.uk"); + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_BOILERPLATE (GstShmSrc, gst_shm_src, GstPushSrc, GST_TYPE_PUSH_SRC); + +static void gst_shm_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_shm_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static gboolean gst_shm_src_start (GstBaseSrc * bsrc); +static gboolean gst_shm_src_stop (GstBaseSrc * bsrc); +static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc, + GstBuffer ** outbuf); +static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc); +static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc); + +static void gst_shm_pipe_inc (GstShmPipe * pipe); +static void gst_shm_pipe_dec (GstShmPipe * pipe); + +// static guint gst_shm_src_signals[LAST_SIGNAL] = { 0 }; + + +static void +gst_shm_src_base_init (gpointer g_class) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&srctemplate)); + + gst_element_class_set_details (element_class, &gst_shm_src_details); +} + +static void +gst_shm_src_class_init (GstShmSrcClass * klass) +{ + GObjectClass *gobject_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpush_src_class; + + gobject_class = (GObjectClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpush_src_class = (GstPushSrcClass *) klass; + + gobject_class->set_property = gst_shm_src_set_property; + gobject_class->get_property = gst_shm_src_get_property; + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop); + gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock); + gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop); + + gstpush_src_class->create = gst_shm_src_create; + + g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, + g_param_spec_string ("socket-path", + "Path to the control socket", + "The path to the control socket used to control the shared memory" + " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_IS_LIVE, + g_param_spec_boolean ("is-live", "Is this a live source", + "True if the element cannot produce data in PAUSED", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source"); +} + +static void +gst_shm_src_init (GstShmSrc * self, GstShmSrcClass * g_class) +{ +} + + +static void +gst_shm_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstShmSrc *self = GST_SHM_SRC (object); + + switch (prop_id) { + case PROP_SOCKET_PATH: + GST_OBJECT_LOCK (object); + if (self->pipe) { + GST_WARNING_OBJECT (object, "Can not modify socket path while the " + "element is playing"); + } else { + g_free (self->socket_path); + self->socket_path = g_value_dup_string (value); + } + GST_OBJECT_UNLOCK (object); + break; + case PROP_IS_LIVE: + gst_base_src_set_live (GST_BASE_SRC (object), + g_value_get_boolean (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_shm_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstShmSrc *self = GST_SHM_SRC (object); + + switch (prop_id) { + case PROP_SOCKET_PATH: + GST_OBJECT_LOCK (object); + g_value_set_string (value, self->socket_path); + GST_OBJECT_UNLOCK (object); + break; + case PROP_IS_LIVE: + g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object))); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_shm_src_start (GstBaseSrc * bsrc) +{ + GstShmSrc *self = GST_SHM_SRC (bsrc); + GstShmPipe *gstpipe = g_slice_new0 (GstShmPipe); + + gstpipe->use_count = 1; + gstpipe->src = gst_object_ref (self); + + if (!self->socket_path) { + GST_ELEMENT_ERROR (bsrc, RESOURCE, NOT_FOUND, + ("No path specified for socket."), (NULL)); + return FALSE; + } + + GST_DEBUG ("Opening socket %s", self->socket_path); + + GST_OBJECT_LOCK (self); + gstpipe->pipe = sp_client_open (self->socket_path); + GST_OBJECT_UNLOCK (self); + + if (!gstpipe->pipe) { + GST_ELEMENT_ERROR (bsrc, RESOURCE, OPEN_READ_WRITE, + ("Could not open socket %s: %d %s", self->socket_path, errno, + strerror (errno)), (NULL)); + gst_shm_pipe_dec (gstpipe); + return FALSE; + } + + self->pipe = gstpipe; + + self->poll = gst_poll_new (TRUE); + gst_poll_fd_init (&self->pollfd); + self->pollfd.fd = sp_get_fd (self->pipe->pipe); + gst_poll_add_fd (self->poll, &self->pollfd); + gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE); + + return TRUE; +} + +static gboolean +gst_shm_src_stop (GstBaseSrc * bsrc) +{ + GstShmSrc *self = GST_SHM_SRC (bsrc); + + GST_DEBUG_OBJECT (self, "Stopping %p", self); + + if (self->pipe) { + gst_shm_pipe_dec (self->pipe); + self->pipe = NULL; + } + + gst_poll_free (self->poll); + self->poll = NULL; + + return TRUE; +} + + +static void +free_buffer (gpointer data) +{ + struct GstShmBuffer *gsb = data; + g_return_if_fail (gsb->pipe != NULL); + g_return_if_fail (gsb->pipe->src != NULL); + + GST_LOG ("Freeing buffer %p", gsb->buf); + + GST_OBJECT_LOCK (gsb->pipe->src); + sp_client_recv_finish (gsb->pipe->pipe, gsb->buf); + GST_OBJECT_UNLOCK (gsb->pipe->src); + + gst_shm_pipe_dec (gsb->pipe); + + g_slice_free (struct GstShmBuffer, gsb); +} + +static GstFlowReturn +gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) +{ + GstShmSrc *self = GST_SHM_SRC (psrc); + gchar *buf = NULL; + int rv = 0; + struct GstShmBuffer *gsb; + + do { + if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) { + if (errno == EBUSY) + return GST_FLOW_WRONG_STATE; + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"), + ("Poll failed on fd: %s", strerror (errno))); + return GST_FLOW_ERROR; + } + + if (self->unlocked) + return GST_FLOW_WRONG_STATE; + + if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"), + ("Control socket has closed")); + return GST_FLOW_ERROR; + } + + if (gst_poll_fd_has_error (self->poll, &self->pollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"), + ("Control socket has error")); + return GST_FLOW_ERROR; + } + + if (gst_poll_fd_can_read (self->poll, &self->pollfd)) { + buf = NULL; + GST_LOG_OBJECT (self, "Reading from pipe"); + GST_OBJECT_LOCK (self); + rv = sp_client_recv (self->pipe->pipe, &buf); + GST_OBJECT_UNLOCK (self); + if (rv < 0) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"), + ("Error reading control data: %d", rv)); + return GST_FLOW_ERROR; + } + } + } while (buf == NULL); + + GST_LOG_OBJECT (self, "Got buffer %p of size %d %d", buf, rv); + + gsb = g_slice_new0 (struct GstShmBuffer); + gsb->buf = buf; + gsb->pipe = self->pipe; + gst_shm_pipe_inc (self->pipe); + + *outbuf = gst_buffer_new (); + GST_BUFFER_FLAG_SET (*outbuf, GST_BUFFER_FLAG_READONLY); + GST_BUFFER_DATA (*outbuf) = (guint8 *) buf; + GST_BUFFER_SIZE (*outbuf) = rv; + GST_BUFFER_MALLOCDATA (*outbuf) = (guint8 *) gsb; + GST_BUFFER_FREE_FUNC (*outbuf) = free_buffer; + + return GST_FLOW_OK; +} + +static gboolean +gst_shm_src_unlock (GstBaseSrc * bsrc) +{ + GstShmSrc *self = GST_SHM_SRC (bsrc); + + self->unlocked = TRUE; + + if (self->poll) + gst_poll_set_flushing (self->poll, TRUE); + + return TRUE; +} + +static gboolean +gst_shm_src_unlock_stop (GstBaseSrc * bsrc) +{ + GstShmSrc *self = GST_SHM_SRC (bsrc); + + self->unlocked = FALSE; + + if (self->poll) + gst_poll_set_flushing (self->poll, FALSE); + + return TRUE; +} + +static void +gst_shm_pipe_inc (GstShmPipe * pipe) +{ + g_return_if_fail (pipe); + g_return_if_fail (pipe->src); + g_return_if_fail (pipe->use_count > 0); + + GST_OBJECT_LOCK (pipe->src); + pipe->use_count++; + GST_OBJECT_UNLOCK (pipe->src); +} + +static void +gst_shm_pipe_dec (GstShmPipe * pipe) +{ + g_return_if_fail (pipe); + g_return_if_fail (pipe->src); + g_return_if_fail (pipe->use_count > 0); + + GST_OBJECT_LOCK (pipe->src); + pipe->use_count--; + + if (pipe->use_count > 0) { + GST_OBJECT_UNLOCK (pipe->src); + return; + } + + if (pipe->pipe) + sp_close (pipe->pipe); + GST_OBJECT_UNLOCK (pipe->src); + + gst_object_unref (pipe->src); + g_slice_free (GstShmPipe, pipe); +} diff --git a/sys/shm/gstshmsrc.h b/sys/shm/gstshmsrc.h new file mode 100644 index 000000000..79d7e8a18 --- /dev/null +++ b/sys/shm/gstshmsrc.h @@ -0,0 +1,76 @@ +/* GStreamer + * Copyright (C) <2009> Collabora Ltd + * @author: Olivier Crete <olivier.crete@collabora.co.uk + * Copyright (C) <2009> Nokia Inc + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_SHM_SRC_H__ +#define __GST_SHM_SRC_H__ + +#include <gst/gst.h> +#include <gst/base/gstpushsrc.h> +#include <gst/base/gstbasesrc.h> + +#include "shmpipe.h" + +G_BEGIN_DECLS +#define GST_TYPE_SHM_SRC \ + (gst_shm_src_get_type()) +#define GST_SHM_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SRC,GstShmSrc)) +#define GST_SHM_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SRC,GstShmSrcClass)) +#define GST_IS_SHM_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SRC)) +#define GST_IS_SHM_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SRC)) +typedef struct _GstShmSrc GstShmSrc; +typedef struct _GstShmSrcClass GstShmSrcClass; +typedef struct _GstShmPipe GstShmPipe; + +struct _GstShmSrc +{ + GstPushSrc element; + + gchar *socket_path; + + GstShmPipe *pipe; + GstPoll *poll; + GstPollFD pollfd; + + + GstFlowReturn flow_return; + gboolean unlocked; +}; + +struct _GstShmSrcClass +{ + GstPushSrcClass parent_class; +}; + +GType gst_shm_src_get_type (void); + +struct _GstShmPipe { + int use_count; + + GstShmSrc *src; + ShmPipe *pipe; +}; + +G_END_DECLS +#endif /* __GST_SHM_SRC_H__ */ diff --git a/sys/shm/shmalloc.c b/sys/shm/shmalloc.c new file mode 100644 index 000000000..75acfbcb2 --- /dev/null +++ b/sys/shm/shmalloc.c @@ -0,0 +1,153 @@ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "shmalloc.h" + +#include <stdio.h> +#include <string.h> +#include <assert.h> + +struct _ShmAllocSpace +{ + size_t size; + + ShmAllocBlock *blocks; +}; + +struct _ShmAllocBlock +{ + int use_count; + + ShmAllocSpace *space; + + unsigned long offset; + unsigned long size; + + ShmAllocBlock *next; +}; + + +ShmAllocSpace * +shm_alloc_space_new (size_t size) +{ + ShmAllocSpace *self = spalloc_new (ShmAllocSpace); + + memset (self, 0, sizeof (ShmAllocSpace)); + + self->size = size; + + return self; +} + +void +shm_alloc_space_free (ShmAllocSpace * self) +{ + assert (self && self->blocks == NULL); + spalloc_free (ShmAllocSpace, self); +} + + +ShmAllocBlock * +shm_alloc_space_alloc_block (ShmAllocSpace * self, unsigned long size) +{ + ShmAllocBlock *block; + ShmAllocBlock *item = NULL; + ShmAllocBlock *prev_item = NULL; + unsigned long prev_end_offset = 0; + + + for (item = self->blocks; item; item = item->next) { + unsigned long max_size = 0; + + max_size = item->offset - prev_end_offset; + + if (max_size >= size) + break; + + prev_end_offset = item->offset + item->size; + prev_item = item; + } + + /* Did not find space before an existing block */ + if (self->blocks && !item) { + /* Return NULL if there is no big enough space, otherwise, there is space + * at the end */ + if (self->size - prev_end_offset < size) + return NULL; + } + + block = spalloc_new (ShmAllocBlock); + memset (block, 0, sizeof (ShmAllocBlock)); + block->offset = prev_end_offset; + block->size = size; + block->use_count = 1; + block->space = self; + + if (prev_item) + prev_item->next = block; + else + self->blocks = block; + + block->next = item; + + return block; +} + +unsigned long +shm_alloc_space_alloc_block_get_offset (ShmAllocBlock * block) +{ + return block->offset; +} + +static void +shm_alloc_space_free_block (ShmAllocBlock * block) +{ + ShmAllocBlock *item = NULL; + ShmAllocBlock *prev_item = NULL; + ShmAllocSpace *self = block->space; + + for (item = self->blocks; item; item = item->next) { + if (item == block) { + if (prev_item) + prev_item->next = item->next; + else + self->blocks = item->next; + break; + } + prev_item = item; + } + + spalloc_free (ShmAllocBlock, block); +} + +ShmAllocBlock * +shm_alloc_space_block_get (ShmAllocSpace * self, unsigned long offset) +{ + ShmAllocBlock *block = NULL; + + for (block = self->blocks; block; block = block->next) { + if (block->offset <= offset && (block->offset + block->size) > offset) + return block; + } + + return NULL; +} + + +void +shm_alloc_space_block_inc (ShmAllocBlock * block) +{ + block->use_count++; +} + +void +shm_alloc_space_block_dec (ShmAllocBlock * block) +{ + block->use_count--; + + if (block->use_count <= 0) + shm_alloc_space_free_block (block); +} diff --git a/sys/shm/shmalloc.h b/sys/shm/shmalloc.h new file mode 100644 index 000000000..6a0609ddb --- /dev/null +++ b/sys/shm/shmalloc.h @@ -0,0 +1,47 @@ + +#include <stdlib.h> + +#ifndef __SHMALLOC_H__ +#define __SHMALLOC_H__ + +#ifdef GST_PACKAGE_NAME +#include <glib.h> + +#define spalloc_new(type) g_slice_new (type) +#define spalloc_alloc(size) g_slice_alloc (size) + +#define spalloc_free(type, buf) g_slice_free (type, buf) +#define spalloc_free1(size, buf) g_slice_free1 (size, buf) + +#else + +#define spalloc_new(type) malloc (sizeof (type)) +#define spalloc_alloc(size) malloc (size) + +#define spalloc_free(type, buf) free (buf) +#define spalloc_free1(size, buf) free (buf) + +#endif + +typedef struct _ShmAllocSpace ShmAllocSpace; +typedef struct _ShmAllocBlock ShmAllocBlock; + +ShmAllocSpace *shm_alloc_space_new (size_t size); +void shm_alloc_space_free (ShmAllocSpace * self); + + +ShmAllocBlock *shm_alloc_space_alloc_block (ShmAllocSpace * self, + unsigned long size); +unsigned long shm_alloc_space_alloc_block_get_offset (ShmAllocBlock *block); + +void shm_alloc_space_block_inc (ShmAllocBlock * block); +void shm_alloc_space_block_dec (ShmAllocBlock * block); +ShmAllocBlock * shm_alloc_space_block_get (ShmAllocSpace * space, + unsigned long offset); + + +#ifdef __cplusplus +} +#endif + +#endif /* __SHMALLOC_H__ */ diff --git a/sys/shm/shmpipe.c b/sys/shm/shmpipe.c new file mode 100644 index 000000000..53fc6df8a --- /dev/null +++ b/sys/shm/shmpipe.c @@ -0,0 +1,831 @@ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "shmpipe.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> +#include <unistd.h> +#include <fcntl.h> +#include <limits.h> +#include <sys/mman.h> +#include <assert.h> + +/* + * The protocol over the pipe is in packets + * + * The defined types are: + * type 1: new shm area + * Area length + * Size of path (followed by path) + * + * type 2: Close shm area: + * No payload + * + * type 3: shm buffer + * offset + * bufsize + * + * type 4: ack buffer + * offset + * + * Type 4 goes from the client to the server + * The rest are from the server to the client + * The client should never write in the SHM + */ + + +#include "shmalloc.h" + +enum +{ + COMMAND_NEW_SHM_AREA = 1, + COMMAND_CLOSE_SHM_AREA = 2, + COMMAND_NEW_BUFFER = 3, + COMMAND_ACK_BUFFER = 4 +}; + +typedef struct _ShmArea ShmArea; +typedef struct _ShmBuffer ShmBuffer; + +struct _ShmArea +{ + int id; + + int use_count; + + int shm_fd; + + char *shm_area; + size_t shm_area_len; + + char *shm_area_name; + + ShmAllocSpace *allocspace; + + ShmArea *next; +}; + +struct _ShmBuffer +{ + int use_count; + + ShmArea *shm_area; + unsigned long offset; + size_t size; + + ShmAllocBlock *block; + + ShmBuffer *next; + + int num_clients; + int clients[0]; +}; + + +struct _ShmPipe +{ + int main_socket; + char *socket_path; + + ShmArea *shm_area; + + int next_area_id; + + ShmBuffer *buffers; + + int num_clients; + ShmClient *clients; + + mode_t perms; +}; + +struct _ShmClient +{ + int fd; + + ShmClient *next; +}; + +struct _ShmBlock +{ + ShmPipe *pipe; + ShmArea *area; + ShmAllocBlock *ablock; +}; + +struct CommandBuffer +{ + unsigned int type; + int area_id; + + union + { + struct + { + size_t size; + unsigned int path_size; + /* Followed by path */ + } new_shm_area; + struct + { + unsigned long offset; + unsigned long size; + } buffer; + struct + { + unsigned long offset; + } ack_buffer; + } payload; +}; + +static ShmArea *sp_open_shm (char *path, int id, int writer, mode_t perms, + size_t size); +static void sp_close_shm (ShmPipe * self, ShmArea * area); +static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, + ShmBuffer * prev_buf); +static void sp_shm_area_dec (ShmPipe * self, ShmArea * area); + + + +#define RETURN_ERROR(format, ...) do { \ + fprintf (stderr, format, __VA_ARGS__); \ + sp_close (self); \ + return NULL; } while (0) + +ShmPipe * +sp_writer_create (const char *path, size_t size, mode_t perms) +{ + ShmPipe *self = spalloc_new (ShmPipe); + int flags; + struct sockaddr_un sun; + int i = 0; + + memset (self, 0, sizeof (ShmPipe)); + + self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0); + + if (self->main_socket < 0) { + RETURN_ERROR ("Could not create socket (%d): %s\n", errno, + strerror (errno)); + } + + flags = fcntl (self->main_socket, F_GETFL, 0); + if (flags < 0) { + RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno)); + } + + if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0) { + RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno)); + } + + sun.sun_family = AF_UNIX; + strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1); + + while (bind (self->main_socket, (struct sockaddr *) &sun, + sizeof (struct sockaddr_un)) < 0) { + if (errno != EADDRINUSE) + RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno)); + + if (i > 256) + RETURN_ERROR ("Could not find a free socket name for %s", path); + + snprintf (sun.sun_path, sizeof (sun.sun_path), "%s.%d", path, i); + i++; + } + + self->socket_path = strdup (sun.sun_path); + + if (listen (self->main_socket, 10) < 0) { + RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno)); + } + + self->shm_area = sp_open_shm (NULL, ++self->next_area_id, 1, perms, size); + + self->perms = perms; + + if (!self->shm_area) { + sp_close (self); + return NULL; + } + + return self; +} + +#undef RETURN_ERROR + +#define RETURN_ERROR(format, ...) \ + fprintf (stderr, format, __VA_ARGS__); \ + sp_shm_area_dec (NULL, area); \ + return NULL; + +static ShmArea * +sp_open_shm (char *path, int id, int writer, mode_t perms, size_t size) +{ + ShmArea *area = spalloc_new (ShmArea); + char tmppath[PATH_MAX]; + int flags; + int prot; + int i = 0; + + memset (area, 0, sizeof (ShmArea)); + + area->use_count = 1; + + area->shm_area_len = size; + + + if (writer) + flags = O_RDWR | O_CREAT | O_TRUNC | O_EXCL; + else + flags = O_RDONLY; + + area->shm_fd = -1; + + if (path) { + area->shm_fd = shm_open (path, flags, perms); + } else { + do { + snprintf (tmppath, PATH_MAX, "/shmpipe.5%d.%5d", getpid (), i++); + area->shm_fd = shm_open (tmppath, flags, perms); + } while (area->shm_fd < 0 && errno == EEXIST); + } + + if (area->shm_fd < 0) { + RETURN_ERROR ("shm_open failed on %s (%d): %s\n", + path ? path : tmppath, errno, strerror (errno)); + } + + if (!path) + area->shm_area_name = strdup (tmppath); + + if (writer) { + if (ftruncate (area->shm_fd, size)) { + RETURN_ERROR ("Could not resize memory area to header size," + " ftruncate failed (%d): %s\n", errno, strerror (errno)); + } + } + + if (writer) + prot = PROT_READ | PROT_WRITE; + else + prot = PROT_READ; + + area->shm_area = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0); + + if (area->shm_area == MAP_FAILED) { + RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno)); + } + + area->id = id; + + if (writer) + area->allocspace = shm_alloc_space_new (area->shm_area_len); + + return area; +} + +#undef RETURN_ERROR + +static void +sp_close_shm (ShmPipe * self, ShmArea * area) +{ + assert (area->use_count == 0); + + if (area->allocspace) + shm_alloc_space_free (area->allocspace); + + if (self != NULL) { + ShmArea *item = NULL; + ShmArea *prev_item = NULL; + + for (item = self->shm_area; item; item = item->next) { + if (item == area) { + if (prev_item) + prev_item->next = item->next; + else + self->shm_area = item->next; + break; + } + prev_item = item; + } + assert (item); + } + + if (area->shm_area != MAP_FAILED) + munmap (area->shm_area, area->shm_area_len); + + if (area->shm_fd >= 0) + close (area->shm_fd); + + if (area->shm_area_name) { + shm_unlink (area->shm_area_name); + free (area->shm_area_name); + } + + spalloc_free (ShmArea, area); +} + +static void +sp_shm_area_inc (ShmArea * area) +{ + area->use_count++; +} + +static void +sp_shm_area_dec (ShmPipe * self, ShmArea * area) +{ + assert (area->use_count > 0); + area->use_count--; + + if (area->use_count == 0) { + sp_close_shm (self, area); + } +} + +void +sp_close (ShmPipe * self) +{ + if (self->main_socket >= 0) + close (self->main_socket); + + if (self->socket_path) { + unlink (self->socket_path); + free (self->socket_path); + } + + while (self->clients) + sp_writer_close_client (self, self->clients); + + while (self->shm_area) { + sp_shm_area_dec (self, self->shm_area); + } + + spalloc_free (ShmPipe, self); +} + +int +sp_writer_setperms_shm (ShmPipe * self, mode_t perms) +{ + self->perms = perms; + return fchmod (self->shm_area->shm_fd, perms); +} + +static int +send_command (int fd, struct CommandBuffer *cb, unsigned short int type, + int area_id) +{ + cb->type = type; + cb->area_id = area_id; + + if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) != + sizeof (struct CommandBuffer)) + return 0; + + return 1; +} + +int +sp_writer_resize (ShmPipe * self, size_t size) +{ + ShmArea *newarea; + ShmArea *old_current; + ShmClient *client; + int c = 0; + int pathlen; + + if (self->shm_area->shm_area_len == size) + return 0; + + newarea = sp_open_shm (NULL, ++self->next_area_id, 1, self->perms, size); + + if (!newarea) + return -1; + + old_current = self->shm_area; + newarea->next = self->shm_area; + self->shm_area = newarea; + + pathlen = strlen (newarea->shm_area_name) + 1; + + for (client = self->clients; client; client = client->next) { + struct CommandBuffer cb = { 0 }; + + if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA, + old_current->id)) + continue; + + cb.payload.new_shm_area.size = newarea->shm_area_len; + cb.payload.new_shm_area.path_size = pathlen; + if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id)) + continue; + + if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) != + pathlen) + continue; + c++; + } + + sp_shm_area_dec (self, old_current); + + + return c; +} + +ShmBlock * +sp_writer_alloc_block (ShmPipe * self, size_t size) +{ + ShmBlock *block; + ShmAllocBlock *ablock = + shm_alloc_space_alloc_block (self->shm_area->allocspace, size); + + if (!ablock) + return NULL; + + block = spalloc_new (ShmBlock); + sp_shm_area_inc (self->shm_area); + block->pipe = self; + block->area = self->shm_area; + block->ablock = ablock; + return block; +} + +char * +sp_writer_block_get_buf (ShmBlock * block) +{ + return block->area->shm_area + + shm_alloc_space_alloc_block_get_offset (block->ablock); +} + +void +sp_writer_free_block (ShmBlock * block) +{ + shm_alloc_space_block_dec (block->ablock); + sp_shm_area_dec (block->pipe, block->area); + spalloc_free (ShmBlock, block); +} + +/* Returns the number of client this has successfully been sent to */ + +int +sp_writer_send_buf (ShmPipe * self, char *buf, size_t size) +{ + ShmArea *area = NULL; + unsigned long offset = 0; + unsigned long bsize = size; + ShmBuffer *sb; + ShmClient *client = NULL; + ShmAllocBlock *block = NULL; + int i = 0; + int c = 0; + + if (self->num_clients == 0) + return 0; + + for (area = self->shm_area; area; area = area->next) { + if (buf >= area->shm_area && buf < (area->shm_area + area->shm_area_len)) { + offset = buf - area->shm_area; + block = shm_alloc_space_block_get (area->allocspace, offset); + assert (block); + break; + } + } + + if (!block) + return -1; + + sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients); + memset (sb, 0, sizeof (ShmBuffer)); + memset (sb->clients, -1, sizeof (int) * self->num_clients); + sb->shm_area = area; + sb->offset = offset; + sb->size = size; + sb->num_clients = self->num_clients; + sb->block = block; + + for (client = self->clients; client; client = client->next) { + struct CommandBuffer cb = { 0 }; + cb.payload.buffer.offset = offset; + cb.payload.buffer.size = bsize; + if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id)) + continue; + sb->clients[i++] = client->fd; + c++; + } + + if (c == 0) { + spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * self->num_clients, sb); + return 0; + } + + sp_shm_area_inc (area); + shm_alloc_space_block_inc (block); + + sb->use_count = c; + + sb->next = self->buffers; + self->buffers = sb; + + return c; +} + +static int +recv_command (int fd, struct CommandBuffer *cb) +{ + int retval; + + retval = recv (fd, cb, sizeof (struct CommandBuffer), MSG_DONTWAIT); + if (retval == sizeof (struct CommandBuffer)) { + return 1; + } else { + return 0; + } +} + +unsigned long +sp_client_recv (ShmPipe * self, char **buf) +{ + char *area_name = NULL; + ShmArea *newarea, *oldarea; + ShmArea *area; + struct CommandBuffer cb; + int retval; + + if (!recv_command (self->main_socket, &cb)) + return -1; + + switch (cb.type) { + case COMMAND_NEW_SHM_AREA: + assert (cb.payload.new_shm_area.path_size > 0); + assert (cb.payload.new_shm_area.size > 0); + + area_name = malloc (cb.payload.new_shm_area.path_size); + retval = recv (self->main_socket, area_name, + cb.payload.new_shm_area.path_size, 0); + if (retval != cb.payload.new_shm_area.path_size) { + free (area_name); + return -3; + } + + newarea = sp_open_shm (area_name, cb.area_id, 0, 0, + cb.payload.new_shm_area.size); + free (area_name); + if (!newarea) + return -4; + + oldarea = self->shm_area; + newarea->next = self->shm_area; + self->shm_area = newarea; + /* + if (oldarea) + sp_shm_area_dec (self, oldarea); + */ + break; + + case COMMAND_CLOSE_SHM_AREA: + for (area = self->shm_area; area; area = area->next) { + if (area->id == cb.area_id) { + sp_shm_area_dec (self, area); + break; + } + } + break; + + case COMMAND_NEW_BUFFER: + assert (buf); + for (area = self->shm_area; area; area = area->next) { + if (area->id == cb.area_id) { + *buf = area->shm_area + cb.payload.buffer.offset; + sp_shm_area_inc (area); + return cb.payload.buffer.size; + } + } + return -23; + + default: + return -99; + } + + return 0; +} + +int +sp_writer_recv (ShmPipe * self, ShmClient * client) +{ + ShmBuffer *buf = NULL, *prev_buf = NULL; + struct CommandBuffer cb; + + if (!recv_command (client->fd, &cb)) + return -1; + + switch (cb.type) { + case COMMAND_ACK_BUFFER: + + for (buf = self->buffers; buf; buf = buf->next) { + if (buf->shm_area->id == cb.area_id && + buf->offset == cb.payload.ack_buffer.offset) { + sp_shmbuf_dec (self, buf, prev_buf); + break; + } + prev_buf = buf; + } + + if (!buf) + return -2; + + break; + default: + return -99; + } + + return 0; +} + +int +sp_client_recv_finish (ShmPipe * self, char *buf) +{ + ShmArea *shm_area = NULL; + unsigned long offset; + struct CommandBuffer cb = { 0 }; + + for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) { + if (buf >= shm_area->shm_area && + buf < shm_area->shm_area + shm_area->shm_area_len) + break; + } + + assert (shm_area); + + offset = buf - shm_area->shm_area; + + sp_shm_area_dec (self, shm_area); + + cb.payload.ack_buffer.offset = offset; + return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER, + self->shm_area->id); +} + +ShmPipe * +sp_client_open (const char *path) +{ + ShmPipe *self = spalloc_new (ShmPipe); + struct sockaddr_un sun; + + memset (self, 0, sizeof (ShmPipe)); + + self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0); + if (self->main_socket < 0) { + sp_close (self); + return NULL; + } + + sun.sun_family = AF_UNIX; + strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1); + + if (connect (self->main_socket, (struct sockaddr *) &sun, + sizeof (struct sockaddr_un)) < 0) + goto error; + + return self; + +error: + spalloc_free (ShmPipe, self); + return NULL; +} + + +ShmClient * +sp_writer_accept_client (ShmPipe * self) +{ + ShmClient *client = NULL; + int fd; + struct CommandBuffer cb = { 0 }; + int pathlen = strlen (self->shm_area->shm_area_name) + 1; + + + fd = accept (self->main_socket, NULL, NULL); + + if (fd < 0) { + fprintf (stderr, "Could not client connection"); + return NULL; + } + + cb.payload.new_shm_area.size = self->shm_area->shm_area_len; + cb.payload.new_shm_area.path_size = pathlen; + if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) { + fprintf (stderr, "Sending new shm area failed: %s", strerror (errno)); + goto error; + } + + if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) != + pathlen) { + fprintf (stderr, "Sending new shm area path failed: %s", strerror (errno)); + goto error; + } + + client = spalloc_new (ShmClient); + client->fd = fd; + + /* Prepend ot linked list */ + client->next = self->clients; + self->clients = client; + self->num_clients++; + + return client; + +error: + close (fd); + return NULL; +} + +static int +sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf) +{ + buf->use_count--; + + if (buf->use_count == 0) { + /* Remove from linked list */ + if (prev_buf) + prev_buf->next = buf->next; + else + self->buffers = buf->next; + + shm_alloc_space_block_dec (buf->block); + sp_shm_area_dec (self, buf->shm_area); + spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf); + return 0; + } + + return 1; +} + +void +sp_writer_close_client (ShmPipe * self, ShmClient * client) +{ + ShmBuffer *buffer = NULL, *prev_buf = NULL; + ShmClient *item = NULL, *prev_item = NULL; + + close (client->fd); + +again: + for (buffer = self->buffers; buffer; buffer = buffer->next) { + int i; + + for (i = 0; i < buffer->num_clients; i++) { + if (buffer->clients[i] == client->fd) { + buffer->clients[i] = -1; + if (!sp_shmbuf_dec (self, buffer, prev_buf)) + goto again; + break; + } + prev_buf = buffer; + } + } + + for (item = self->clients; item; item = item->next) { + if (item == client) + break; + prev_item = item; + } + assert (item); + + if (prev_item) + prev_item->next = client->next; + else + self->clients = client->next; + + self->num_clients--; + + spalloc_free (ShmClient, client); +} + +int +sp_get_fd (ShmPipe * self) +{ + return self->main_socket; +} + +int +sp_writer_get_client_fd (ShmClient * client) +{ + return client->fd; +} + +int +sp_writer_pending_writes (ShmPipe * self) +{ + return (self->buffers != NULL); +} + +const char * +sp_writer_get_path (ShmPipe * pipe) +{ + return pipe->socket_path; +} diff --git a/sys/shm/shmpipe.h b/sys/shm/shmpipe.h new file mode 100644 index 000000000..f3657b676 --- /dev/null +++ b/sys/shm/shmpipe.h @@ -0,0 +1,78 @@ +/* + * + * First, create a writer with sp_writer_create() + * And selectes() on the socket from sp_get_fd() + * If the socket is closed or there are errors from any function, the app + * should call sp_close() and assume the writer is dead + * The server calls sp_writer_accept_client() when there is something to read + * from the server fd + * It then needs to select() on the socket from sp_writer_get_client_fd() + * If it gets an error on that socket, it call sp_writer_close_client(). + * If there is something to read, it calls sp_writer_recv(). + * + * The writer allocates buffers with sp_writer_alloc_block(), + * writes something in the buffer (retrieved with sp_writer_block_get_buf(), + * then calls sp_writer_send_buf() to send the buffer or a subsection to + * the other side. When it is done with the block, it calls + * sp_writer_free_block(). + * If alloc fails, then the server must wait for events from the clients before + * trying again. + * + * + * The clients connect with sp_client_open() + * And select() on the fd from sp_get_fd() until there is something to read. + * Then they must read using sp_client_recv() which will return > 0 if there + * is a valid buffer (which is read only). It will return 0 if it is an internal + * message and <0 if there was an error. If there was an error, one must close + * it with sp_close(). If was valid buffer was received, the client must release + * it with sp_client_recv_finish() when it is done reading from it. + */ + + +#ifndef __SHMPIPE_H__ +#define __SHMPIPE_H__ + +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _ShmClient ShmClient; +typedef struct _ShmPipe ShmPipe; +typedef struct _ShmBlock ShmBlock; + +ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms); +const char *sp_writer_get_path (ShmPipe *pipe); +void sp_close (ShmPipe * self); + +int sp_writer_setperms_shm (ShmPipe * self, mode_t perms); +int sp_writer_resize (ShmPipe * self, size_t size); + +int sp_get_fd (ShmPipe * self); +int sp_writer_get_client_fd (ShmClient * client); + +ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size); +void sp_writer_free_block (ShmBlock *block); +int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size); +char *sp_writer_block_get_buf (ShmBlock *block); + +ShmClient * sp_writer_accept_client (ShmPipe * self); +void sp_writer_close_client (ShmPipe *self, ShmClient * client); +int sp_writer_recv (ShmPipe * self, ShmClient * client); + +int sp_writer_pending_writes (ShmPipe * self); + +ShmPipe *sp_client_open (const char *path); +unsigned long sp_client_recv (ShmPipe * self, char **buf); +int sp_client_recv_finish (ShmPipe * self, char *buf); + +#ifdef __cplusplus +} +#endif + +#endif /* __SHMPIPE_H__ */ |