summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/Makefile.am8
-rw-r--r--ext/sctp/Makefile.am21
-rw-r--r--ext/sctp/gstsctpdec.c655
-rw-r--r--ext/sctp/gstsctpdec.h66
-rw-r--r--ext/sctp/gstsctpenc.c937
-rw-r--r--ext/sctp/gstsctpenc.h77
-rw-r--r--ext/sctp/gstsctpplugin.c53
-rw-r--r--ext/sctp/sctpassociation.c836
-rw-r--r--ext/sctp/sctpassociation.h123
9 files changed, 2776 insertions, 0 deletions
diff --git a/ext/Makefile.am b/ext/Makefile.am
index 45131153a..4a18dd869 100644
--- a/ext/Makefile.am
+++ b/ext/Makefile.am
@@ -286,6 +286,12 @@ else
SBC_DIR=
endif
+if USE_SCTP
+SCTP_DIR=sctp
+else
+SCTP_DIR=
+endif
+
if USE_SMOOTHSTREAMING
SMOOTHSTREAMING_DIR = smoothstreaming
else
@@ -455,6 +461,7 @@ SUBDIRS=\
$(OPUS_DIR) \
$(RSVG_DIR) \
$(SBC_DIR) \
+ $(SCTP_DIR) \
$(SMOOTHSTREAMING_DIR) \
$(SMOOTHWAVE_DIR) \
$(SNDFILE_DIR) \
@@ -523,6 +530,7 @@ DIST_SUBDIRS = \
rsvg \
resindvd \
sbc \
+ sctp \
smoothstreaming \
sndfile \
soundtouch \
diff --git a/ext/sctp/Makefile.am b/ext/sctp/Makefile.am
new file mode 100644
index 000000000..7f535ee69
--- /dev/null
+++ b/ext/sctp/Makefile.am
@@ -0,0 +1,21 @@
+plugin_LTLIBRARIES = libgstsctp.la
+
+libgstsctp_la_SOURCES = \
+ gstsctpplugin.c \
+ sctpassociation.c \
+ gstsctpenc.c \
+ gstsctpdec.c
+
+libgstsctp_la_CFLAGS = \
+ $(GST_PLUGINS_BASE_CFLAGS) \
+ $(GST_BASE_CFLAGS) \
+ $(GST_CFLAGS) \
+ $(USRSCTP_CFLAGS) \
+ -I$(top_srcdir)/gst-libs
+
+libgstsctp_la_LIBADD = $(GST_LIBS) $(GST_BASE_LIBS) $(USRSCTP_LIBS) $(top_builddir)/gst-libs/gst/sctp/libgstsctp-1.0.la
+
+noinst_HEADERS = \
+ sctpassociation.h \
+ gstsctpenc.h \
+ gstsctpdec.h
diff --git a/ext/sctp/gstsctpdec.c b/ext/sctp/gstsctpdec.c
new file mode 100644
index 000000000..ccc5ec12f
--- /dev/null
+++ b/ext/sctp/gstsctpdec.c
@@ -0,0 +1,655 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstsctpdec.h"
+
+#include <gst/sctp/sctpreceivemeta.h>
+#include <gst/base/gstdataqueue.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category);
+#define GST_CAT_DEFAULT gst_sctp_dec_debug_category
+
+#define gst_sctp_dec_parent_class parent_class
+G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT);
+
+static GstStaticPadTemplate sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK,
+ GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
+
+static GstStaticPadTemplate src_template =
+GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC,
+ GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY);
+
+enum
+{
+ SIGNAL_RESET_STREAM,
+ NUM_SIGNALS
+};
+
+static guint signals[NUM_SIGNALS];
+
+enum
+{
+ PROP_0,
+
+ PROP_GST_SCTP_ASSOCIATION_ID,
+ PROP_LOCAL_SCTP_PORT,
+
+ NUM_PROPERTIES
+};
+
+static GParamSpec *properties[NUM_PROPERTIES];
+
+#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
+#define DEFAULT_LOCAL_SCTP_PORT 0
+#define MAX_SCTP_PORT 65535
+#define MAX_GST_SCTP_ASSOCIATION_ID 65535
+#define MAX_STREAM_ID 65535
+
+GType gst_sctp_dec_pad_get_type (void);
+
+#define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type())
+#define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad))
+#define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass))
+#define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD))
+#define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD))
+
+typedef struct _GstSctpDecPad GstSctpDecPad;
+typedef GstPadClass GstSctpDecPadClass;
+
+struct _GstSctpDecPad
+{
+ GstPad parent;
+
+ GstDataQueue *packet_queue;
+};
+
+G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD);
+
+static void
+gst_sctp_dec_pad_finalize (GObject * object)
+{
+ GstSctpDecPad *self = GST_SCTP_DEC_PAD (object);
+
+ gst_object_unref (self->packet_queue);
+
+ G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object);
+}
+
+static gboolean
+data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
+ guint64 time, gpointer user_data)
+{
+ /* FIXME: Are we full at some point and block? */
+ return FALSE;
+}
+
+static void
+data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->finalize = gst_sctp_dec_pad_finalize;
+}
+
+static void
+gst_sctp_dec_pad_init (GstSctpDecPad * self)
+{
+ self->packet_queue = gst_data_queue_new (data_queue_check_full_cb,
+ data_queue_full_cb, data_queue_empty_cb, NULL);
+}
+
+static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
+ GstStateChange transition);
+static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
+ GstBuffer * buf);
+static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self,
+ GstEvent * event);
+static void gst_sctp_data_srcpad_loop (GstPad * pad);
+
+static gboolean configure_association (GstSctpDec * self);
+static void on_gst_sctp_association_stream_reset (GstSctpAssociation *
+ gst_sctp_association, guint16 stream_id, GstSctpDec * self);
+static void on_receive (GstSctpAssociation * gst_sctp_association, guint8 * buf,
+ gsize length, guint16 stream_id, guint ppid, gpointer user_data);
+static void stop_srcpad_task (GstPad * pad);
+static void stop_all_srcpad_tasks (GstSctpDec * self);
+static void sctpdec_cleanup (GstSctpDec * self);
+static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
+static void remove_pad (GstElement * element, GstPad * pad);
+static void on_reset_stream (GstSctpDec * self, guint stream_id);
+
+static void
+gst_sctp_dec_class_init (GstSctpDecClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *element_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ element_class = GST_ELEMENT_CLASS (klass);
+
+ GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category,
+ "sctpdec", 0, "debug category for sctpdec element");
+
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&sink_template));
+
+ gobject_class->set_property = gst_sctp_dec_set_property;
+ gobject_class->get_property = gst_sctp_dec_get_property;
+
+ element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
+
+ klass->on_reset_stream = on_reset_stream;
+
+ properties[PROP_GST_SCTP_ASSOCIATION_ID] =
+ g_param_spec_uint ("sctp-association-id",
+ "SCTP Association ID",
+ "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
+ "This value must be set before any pads are requested.",
+ 0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_LOCAL_SCTP_PORT] =
+ g_param_spec_uint ("local-sctp-port",
+ "Local SCTP port",
+ "Local sctp port for the sctp association. The remote port is configured via the "
+ "GstSctpEnc element.",
+ 0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+
+ signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream",
+ G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ gst_element_class_set_static_metadata (element_class,
+ "SCTP Decoder",
+ "Decoder/Network/SCTP",
+ "Decodes packets with SCTP",
+ "George Kiagiadakis <george.kiagiadakis@collabora.com>");
+}
+
+static void
+gst_sctp_dec_init (GstSctpDec * self)
+{
+ self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
+ self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
+
+ self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
+ gst_pad_set_chain_function (self->sink_pad,
+ GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
+ gst_pad_set_event_function (self->sink_pad,
+ GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event));
+
+ gst_element_add_pad (GST_ELEMENT (self), self->sink_pad);
+}
+
+static void
+gst_sctp_dec_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSctpDec *self = GST_SCTP_DEC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ self->sctp_association_id = g_value_get_uint (value);
+ break;
+ case PROP_LOCAL_SCTP_PORT:
+ self->local_sctp_port = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
+ GParamSpec * pspec)
+{
+ GstSctpDec *self = GST_SCTP_DEC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ g_value_set_uint (value, self->sctp_association_id);
+ break;
+ case PROP_LOCAL_SCTP_PORT:
+ g_value_set_uint (value, self->local_sctp_port);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
+{
+ GstSctpDec *self = GST_SCTP_DEC (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ if (!configure_association (self))
+ ret = GST_STATE_CHANGE_FAILURE;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ sctpdec_cleanup (self);
+ break;
+ default:
+ break;
+ }
+
+ if (ret != GST_STATE_CHANGE_FAILURE)
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ return ret;
+}
+
+static GstFlowReturn
+gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
+{
+ GstMapInfo map;
+
+ if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
+ GST_WARNING_OBJECT (self, "Could not map GstBuffer");
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+
+ gst_sctp_association_incoming_packet (self->sctp_association,
+ (guint8 *) map.data, (guint32) map.size);
+ gst_buffer_unmap (buf, &map);
+ gst_buffer_unref (buf);
+
+ return GST_FLOW_OK;
+}
+
+static void
+flush_srcpad (const GValue * item, gpointer user_data)
+{
+ GstSctpDecPad *sctpdec_pad = g_value_get_object (item);
+ gboolean flush = GPOINTER_TO_INT (user_data);
+
+ if (flush) {
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ } else {
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
+ gst_pad_start_task (GST_PAD (sctpdec_pad),
+ (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL);
+ }
+}
+
+static gboolean
+gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
+{
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_CAPS:
+ /* We create our own stream-start events and the caps event does not
+ * make sense */
+ gst_event_unref (event);
+ return TRUE;
+ case GST_EVENT_EOS:
+ /* Drop this, we're never EOS until shut down */
+ gst_event_unref (event);
+ return TRUE;
+ case GST_EVENT_FLUSH_START:{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_srcpad,
+ GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ case GST_EVENT_FLUSH_STOP:{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_srcpad,
+ GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ default:
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+}
+
+static void
+gst_sctp_data_srcpad_loop (GstPad * pad)
+{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+ GstDataQueueItem *item;
+
+ if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
+ GstFlowReturn flow_ret;
+
+ flow_ret = gst_pad_push (pad, GST_BUFFER (item->object));
+ item->object = NULL;
+ if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
+ || flow_ret == GST_FLOW_NOT_LINKED)) {
+ GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ }
+
+ if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ gst_pad_pause_task (pad);
+ }
+
+ item->destroy (item);
+ } else {
+ GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
+ gst_pad_pause_task (pad);
+ }
+}
+
+static gboolean
+configure_association (GstSctpDec * self)
+{
+ gint state;
+
+ self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
+ GST_WARNING_OBJECT (self,
+ "Could not configure SCTP association. Association already in use!");
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+ goto error;
+ }
+
+ self->signal_handler_stream_reset =
+ g_signal_connect_object (self->sctp_association, "stream-reset",
+ G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0);
+
+ g_object_bind_property (self, "local-sctp-port", self->sctp_association,
+ "local-port", G_BINDING_SYNC_CREATE);
+
+ gst_sctp_association_set_on_packet_received (self->sctp_association,
+ on_receive, self);
+
+ return TRUE;
+error:
+ return FALSE;
+}
+
+static gboolean
+gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
+{
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_RECONFIGURE:
+ case GST_EVENT_FLUSH_STOP:{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ /* Unflush and start task again */
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
+ gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad,
+ NULL);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ case GST_EVENT_FLUSH_START:{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ default:
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+}
+
+static gboolean
+copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+ GstPad *new_pad = user_data;
+
+ if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS
+ && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START)
+ gst_pad_store_sticky_event (new_pad, *event);
+
+ return TRUE;
+}
+
+static GstPad *
+get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
+{
+ GstPad *new_pad = NULL;
+ gint state;
+ gchar *pad_name, *pad_stream_id;
+ GstPadTemplate *template;
+
+ pad_name = g_strdup_printf ("src_%hu", stream_id);
+ new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
+ if (new_pad) {
+ g_free (pad_name);
+ return new_pad;
+ }
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
+ GST_WARNING_OBJECT (self,
+ "The SCTP association must be established before a new stream can be created");
+ return NULL;
+ }
+
+ if (stream_id > MAX_STREAM_ID)
+ return NULL;
+
+ template = gst_static_pad_template_get (&src_template);
+ new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name,
+ "direction", template->direction, "template", template, NULL);
+ g_free (pad_name);
+
+ gst_pad_set_event_function (new_pad,
+ GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event));
+
+ if (!gst_pad_set_active (new_pad, TRUE))
+ goto error_cleanup;
+
+ pad_stream_id =
+ gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu",
+ stream_id);
+ gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id));
+ g_free (pad_stream_id);
+ gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad);
+
+ if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
+ goto error_cleanup;
+
+ gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
+ new_pad, NULL);
+
+ gst_object_ref (new_pad);
+
+ return new_pad;
+
+error_cleanup:
+ gst_object_unref (new_pad);
+ return NULL;
+}
+
+static void
+remove_pad (GstElement * element, GstPad * pad)
+{
+ stop_srcpad_task (pad);
+ gst_pad_set_active (pad, FALSE);
+ gst_element_remove_pad (element, pad);
+}
+
+static void
+on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
+ guint16 stream_id, GstSctpDec * self)
+{
+ gchar *pad_name;
+ GstPad *srcpad;
+
+ pad_name = g_strdup_printf ("src_%hu", stream_id);
+ srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
+ g_free (pad_name);
+ if (!srcpad) {
+ GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
+ return;
+ }
+ remove_pad (GST_ELEMENT (self), srcpad);
+ gst_object_unref (srcpad);
+}
+
+static void
+data_queue_item_free (GstDataQueueItem * item)
+{
+ if (item->object)
+ gst_mini_object_unref (item->object);
+ g_free (item);
+}
+
+static void
+on_receive (GstSctpAssociation * sctp_association, guint8 * buf, gsize length,
+ guint16 stream_id, guint ppid, gpointer user_data)
+{
+ GstSctpDec *self = user_data;
+ GstSctpDecPad *sctpdec_pad;
+ GstPad *src_pad;
+ GstDataQueueItem *item;
+ GstBuffer *gstbuf;
+
+ src_pad = get_pad_for_stream_id (self, stream_id);
+ g_assert (src_pad);
+
+ sctpdec_pad = GST_SCTP_DEC_PAD (src_pad);
+ gstbuf = gst_buffer_new_wrapped (buf, length);
+ gst_sctp_buffer_add_receive_meta (gstbuf, ppid);
+
+ item = g_new0 (GstDataQueueItem, 1);
+ item->object = GST_MINI_OBJECT (gstbuf);
+ item->size = length;
+ item->visible = TRUE;
+ item->destroy = (GDestroyNotify) data_queue_item_free;
+ if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) {
+ item->destroy (item);
+ GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing");
+ }
+
+ gst_object_unref (src_pad);
+}
+
+static void
+stop_srcpad_task (GstPad * pad)
+{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ gst_pad_stop_task (pad);
+}
+
+static void
+remove_pad_it (const GValue * item, gpointer user_data)
+{
+ GstPad *pad = g_value_get_object (item);
+ GstSctpDec *self = user_data;
+
+ remove_pad (GST_ELEMENT (self), pad);
+}
+
+static void
+stop_all_srcpad_tasks (GstSctpDec * self)
+{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+}
+
+static void
+sctpdec_cleanup (GstSctpDec * self)
+{
+ if (self->sctp_association) {
+ gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
+ NULL);
+ g_signal_handler_disconnect (self->sctp_association,
+ self->signal_handler_stream_reset);
+ stop_all_srcpad_tasks (self);
+ gst_sctp_association_force_close (self->sctp_association);
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+ }
+}
+
+static void
+on_reset_stream (GstSctpDec * self, guint stream_id)
+{
+ if (self->sctp_association) {
+ gst_sctp_association_reset_stream (self->sctp_association, stream_id);
+ on_gst_sctp_association_stream_reset (self->sctp_association, stream_id,
+ self);
+ }
+}
diff --git a/ext/sctp/gstsctpdec.h b/ext/sctp/gstsctpdec.h
new file mode 100644
index 000000000..845fac4d4
--- /dev/null
+++ b/ext/sctp/gstsctpdec.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifndef __GST_SCTP_DEC_H__
+#define __GST_SCTP_DEC_H__
+
+#include <gst/gst.h>
+
+#include "sctpassociation.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SCTP_DEC (gst_sctp_dec_get_type())
+#define GST_SCTP_DEC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC, GstSctpDec))
+#define GST_SCTP_DEC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC, GstSctpDecClass))
+#define GST_IS_SCTP_DEC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC))
+#define GST_IS_SCTP_DEC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC))
+typedef struct _GstSctpDec GstSctpDec;
+typedef struct _GstSctpDecClass GstSctpDecClass;
+
+struct _GstSctpDec
+{
+ GstElement element;
+
+ GstPad *sink_pad;
+ guint sctp_association_id;
+ guint local_sctp_port;
+
+ GstSctpAssociation *sctp_association;
+ gulong signal_handler_stream_reset;
+};
+
+struct _GstSctpDecClass
+{
+ GstElementClass parent_class;
+
+ void (*on_reset_stream) (GstSctpDec * sctp_dec, guint stream_id);
+};
+
+GType gst_sctp_dec_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SCTP_DEC_H__ */
diff --git a/ext/sctp/gstsctpenc.c b/ext/sctp/gstsctpenc.c
new file mode 100644
index 000000000..80a338f33
--- /dev/null
+++ b/ext/sctp/gstsctpenc.c
@@ -0,0 +1,937 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstsctpenc.h"
+
+#include <gst/sctp/sctpsendmeta.h>
+#include <stdio.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
+#define GST_CAT_DEFAULT gst_sctp_enc_debug_category
+
+#define gst_sctp_enc_parent_class parent_class
+G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
+
+static GstStaticPadTemplate sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
+ GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
+
+static GstStaticPadTemplate src_template =
+GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
+ GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
+
+enum
+{
+ SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
+ SIGNAL_GET_STREAM_BYTES_SENT,
+ NUM_SIGNALS
+};
+
+static guint signals[NUM_SIGNALS];
+
+enum
+{
+ PROP_0,
+
+ PROP_GST_SCTP_ASSOCIATION_ID,
+ PROP_REMOTE_SCTP_PORT,
+ PROP_USE_SOCK_STREAM,
+
+ NUM_PROPERTIES
+};
+
+static GParamSpec *properties[NUM_PROPERTIES];
+
+#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
+#define DEFAULT_REMOTE_SCTP_PORT 0
+#define DEFAULT_GST_SCTP_ORDERED TRUE
+#define DEFAULT_SCTP_PPID 1
+#define DEFAULT_USE_SOCK_STREAM FALSE
+
+#define BUFFER_FULL_SLEEP_TIME 100000
+
+GType gst_sctp_enc_pad_get_type (void);
+
+#define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
+#define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
+#define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
+#define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
+#define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
+
+typedef struct _GstSctpEncPad GstSctpEncPad;
+typedef GstPadClass GstSctpEncPadClass;
+
+struct _GstSctpEncPad
+{
+ GstPad parent;
+
+ guint16 stream_id;
+ gboolean ordered;
+ guint32 ppid;
+ GstSctpAssociationPartialReliability reliability;
+ guint32 reliability_param;
+
+ guint64 bytes_sent;
+
+ GMutex lock;
+ GCond cond;
+ gboolean flushing;
+};
+
+G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
+
+static void
+gst_sctp_enc_pad_finalize (GObject * object)
+{
+ GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
+
+ g_cond_clear (&self->cond);
+ g_mutex_clear (&self->lock);
+
+ G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
+}
+
+static void
+gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+
+ gobject_class->finalize = gst_sctp_enc_pad_finalize;
+}
+
+static void
+gst_sctp_enc_pad_init (GstSctpEncPad * self)
+{
+ g_mutex_init (&self->lock);
+ g_cond_init (&self->cond);
+ self->flushing = FALSE;
+}
+
+static void gst_sctp_enc_finalize (GObject * object);
+static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
+ GstStateChange transition);
+static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
+ GstPadTemplate * template, const gchar * name, const GstCaps * caps);
+static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
+static void gst_sctp_enc_srcpad_loop (GstPad * pad);
+static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
+static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static void on_sctp_association_state_changed (GstSctpAssociation *
+ sctp_association, GParamSpec * pspec, GstSctpEnc * self);
+
+static gboolean configure_association (GstSctpEnc * self);
+static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
+ const guint8 * buf, gsize length, gpointer user_data);
+static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
+static void sctpenc_cleanup (GstSctpEnc * self);
+static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
+ GstSctpAssociationPartialReliability * reliability,
+ guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
+static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
+
+static void
+gst_sctp_enc_class_init (GstSctpEncClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *element_class;
+
+ gobject_class = (GObjectClass *) klass;
+ element_class = (GstElementClass *) klass;
+
+ GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
+ "sctpenc", 0, "debug category for sctpenc element");
+
+ gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
+ gst_static_pad_template_get (&src_template));
+ gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
+ gst_static_pad_template_get (&sink_template));
+
+ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
+ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
+ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
+
+ element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
+ element_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
+ element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
+
+ properties[PROP_GST_SCTP_ASSOCIATION_ID] =
+ g_param_spec_uint ("sctp-association-id",
+ "SCTP Association ID",
+ "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
+ "This value must be set before any pads are requested.",
+ 0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_REMOTE_SCTP_PORT] =
+ g_param_spec_uint ("remote-sctp-port",
+ "Remote SCTP port",
+ "Sctp remote sctp port for the sctp association. The local port is configured via the "
+ "GstSctpDec element.",
+ 0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_USE_SOCK_STREAM] =
+ g_param_spec_boolean ("use-sock-stream",
+ "Use sock-stream",
+ "When set to TRUE, a sequenced, reliable, connection-based connection is used."
+ "When TRUE the partial reliability parameters of the channel are ignored.",
+ DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+
+ signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
+ g_signal_new ("sctp-association-established",
+ G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
+
+ signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
+ G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_UINT64, 1, G_TYPE_UINT);
+
+ klass->on_get_stream_bytes_sent =
+ GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
+
+ gst_element_class_set_static_metadata (element_class,
+ "SCTP Encoder",
+ "Encoder/Network/SCTP",
+ "Encodes packets with SCTP",
+ "George Kiagiadakis <george.kiagiadakis@collabora.com>");
+}
+
+static gboolean
+data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
+ guint64 time, gpointer user_data)
+{
+ /* TODO: When are we considered full? */
+ return FALSE;
+}
+
+static void
+data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+gst_sctp_enc_init (GstSctpEnc * self)
+{
+ self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
+ self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
+
+ self->sctp_association = NULL;
+ self->outbound_sctp_packet_queue =
+ gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
+ data_queue_empty_cb, NULL);
+
+ self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
+ gst_pad_set_event_function (self->src_pad,
+ GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
+ gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
+
+ g_queue_init (&self->pending_pads);
+}
+
+static void
+gst_sctp_enc_finalize (GObject * object)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (object);
+
+ g_queue_clear (&self->pending_pads);
+ gst_object_unref (self->outbound_sctp_packet_queue);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_sctp_enc_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ self->sctp_association_id = g_value_get_uint (value);
+ break;
+ case PROP_REMOTE_SCTP_PORT:
+ self->remote_sctp_port = g_value_get_uint (value);
+ break;
+ case PROP_USE_SOCK_STREAM:
+ self->use_sock_stream = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
+ GParamSpec * pspec)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ g_value_set_uint (value, self->sctp_association_id);
+ break;
+ case PROP_REMOTE_SCTP_PORT:
+ g_value_set_uint (value, self->remote_sctp_port);
+ break;
+ case PROP_USE_SOCK_STREAM:
+ g_value_set_boolean (value, self->use_sock_stream);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
+ gboolean res = TRUE;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ self->need_segment = self->need_stream_start_caps = TRUE;
+ gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
+ gst_pad_start_task (self->src_pad,
+ (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
+ res = configure_association (self);
+ break;
+
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ break;
+
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ sctpenc_cleanup (self);
+ break;
+
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+
+ if (res)
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ return ret;
+}
+
+static GstPad *
+gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
+ const gchar * new_pad_name, const GstCaps * caps)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (element);
+ GstPad *new_pad = NULL;
+ GstSctpEncPad *sctpenc_pad;
+ guint32 stream_id;
+ gint state;
+ guint32 new_ppid;
+ gboolean is_new_ppid;
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
+ g_warning
+ ("The SCTP association must be established before a new stream can be created");
+ goto invalid_state;
+ }
+
+ if (!template)
+ goto invalid_parameter;
+
+ if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
+ || stream_id > 65534) /* 65535 is not a valid stream id */
+ goto invalid_parameter;
+
+ new_pad = gst_element_get_static_pad (element, new_pad_name);
+ if (new_pad) {
+ gst_object_unref (new_pad);
+ new_pad = NULL;
+ goto invalid_parameter;
+ }
+
+ new_pad =
+ g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
+ template->direction, "template", template, NULL);
+ gst_pad_set_chain_function (new_pad,
+ GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
+ gst_pad_set_event_function (new_pad,
+ GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
+
+ sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
+ sctpenc_pad->stream_id = stream_id;
+ sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
+
+ get_config_from_caps (caps, &sctpenc_pad->ordered, &sctpenc_pad->reliability,
+ &sctpenc_pad->reliability_param, &new_ppid, &is_new_ppid);
+
+ if (is_new_ppid)
+ sctpenc_pad->ppid = new_ppid;
+ sctpenc_pad->flushing = FALSE;
+
+ if (!gst_pad_set_active (new_pad, TRUE))
+ goto error_cleanup;
+
+ if (!gst_element_add_pad (element, new_pad))
+ goto error_cleanup;
+
+invalid_state:
+invalid_parameter:
+ return new_pad;
+error_cleanup:
+ gst_object_unref (new_pad);
+ return NULL;
+}
+
+static void
+gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
+{
+ GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+ GstSctpEnc *self;
+ guint stream_id = 0;
+
+ self = GST_SCTP_ENC (element);
+
+ g_mutex_lock (&sctpenc_pad->lock);
+ sctpenc_pad->flushing = TRUE;
+ g_cond_signal (&sctpenc_pad->cond);
+ g_mutex_unlock (&sctpenc_pad->lock);
+
+ stream_id = sctpenc_pad->stream_id;
+ gst_pad_set_active (pad, FALSE);
+
+ if (self->sctp_association)
+ gst_sctp_association_reset_stream (self->sctp_association, stream_id);
+
+ gst_element_remove_pad (element, pad);
+}
+
+static void
+gst_sctp_enc_srcpad_loop (GstPad * pad)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
+ GstFlowReturn flow_ret;
+ GstDataQueueItem *item;
+
+ if (self->need_stream_start_caps) {
+ gchar s_id[32];
+ GstCaps *caps;
+
+ g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
+ gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
+
+ caps = gst_caps_new_empty_simple ("application/x-sctp");
+ gst_pad_set_caps (self->src_pad, caps);
+ gst_caps_unref (caps);
+
+ self->need_stream_start_caps = FALSE;
+ }
+
+ if (self->need_segment) {
+ GstSegment segment;
+
+ gst_segment_init (&segment, GST_FORMAT_BYTES);
+ gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
+
+ self->need_segment = FALSE;
+ }
+
+ if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
+ flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object));
+ item->object = NULL;
+
+ if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
+ || flow_ret == GST_FLOW_NOT_LINKED)) {
+ GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ }
+
+ if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
+ gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
+ gst_data_queue_flush (self->outbound_sctp_packet_queue);
+ gst_pad_pause_task (pad);
+ }
+
+ item->destroy (item);
+ } else {
+ GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
+ gst_pad_pause_task (pad);
+ }
+}
+
+static GstFlowReturn
+gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (parent);
+ GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+ GstMapInfo map;
+ guint32 ppid;
+ gboolean ordered;
+ GstSctpAssociationPartialReliability pr;
+ guint32 pr_param;
+ gpointer state = NULL;
+ GstMeta *meta;
+ const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
+ GstFlowReturn flow_ret = GST_FLOW_ERROR;
+
+ ppid = sctpenc_pad->ppid;
+ ordered = sctpenc_pad->ordered;
+ pr = sctpenc_pad->reliability;
+ pr_param = sctpenc_pad->reliability_param;
+
+ while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
+ if (meta->info->api == meta_info->api) {
+ GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
+
+ ppid = sctp_send_meta->ppid;
+ ordered = sctp_send_meta->ordered;
+ pr_param = sctp_send_meta->pr_param;
+ switch (sctp_send_meta->pr) {
+ case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
+ pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
+ break;
+ case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
+ pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
+ break;
+ case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
+ pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
+ break;
+ case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
+ pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
+ break;
+ }
+ break;
+ }
+ }
+
+ if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
+ g_warning ("Could not map GstBuffer");
+ goto error;
+ }
+
+ g_mutex_lock (&sctpenc_pad->lock);
+ while (!sctpenc_pad->flushing) {
+ gboolean data_sent = FALSE;
+
+ g_mutex_unlock (&sctpenc_pad->lock);
+
+ data_sent =
+ gst_sctp_association_send_data (self->sctp_association, map.data,
+ map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
+
+ g_mutex_lock (&sctpenc_pad->lock);
+ if (data_sent) {
+ sctpenc_pad->bytes_sent += map.size;
+ break;
+ } else if (!sctpenc_pad->flushing) {
+ gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
+
+ /* The buffer was probably full. Retry in a while */
+ GST_OBJECT_LOCK (self);
+ g_queue_push_tail (&self->pending_pads, sctpenc_pad);
+ GST_OBJECT_UNLOCK (self);
+
+ g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
+
+ GST_OBJECT_LOCK (self);
+ g_queue_remove (&self->pending_pads, sctpenc_pad);
+ GST_OBJECT_UNLOCK (self);
+ }
+ }
+ flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
+ g_mutex_unlock (&sctpenc_pad->lock);
+
+ gst_buffer_unmap (buffer, &map);
+error:
+ gst_buffer_unref (buffer);
+ return flow_ret;
+}
+
+static gboolean
+gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+ gboolean ret, is_new_ppid;
+ guint32 new_ppid;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CAPS:{
+ GstCaps *caps;
+
+ gst_event_parse_caps (event, &caps);
+ get_config_from_caps (caps, &sctpenc_pad->ordered,
+ &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
+ &is_new_ppid);
+ if (is_new_ppid)
+ sctpenc_pad->ppid = new_ppid;
+ gst_event_unref (event);
+ ret = TRUE;
+ break;
+ }
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_SEGMENT:
+ /* Drop these, we create our own */
+ ret = TRUE;
+ gst_event_unref (event);
+ break;
+ case GST_EVENT_EOS:
+ /* Drop this, we're never EOS until shut down */
+ ret = TRUE;
+ gst_event_unref (event);
+ break;
+ case GST_EVENT_FLUSH_START:
+ g_mutex_lock (&sctpenc_pad->lock);
+ sctpenc_pad->flushing = TRUE;
+ g_cond_signal (&sctpenc_pad->cond);
+ g_mutex_unlock (&sctpenc_pad->lock);
+
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ sctpenc_pad->flushing = FALSE;
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ default:
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ return ret;
+}
+
+static void
+flush_sinkpad (const GValue * item, gpointer user_data)
+{
+ GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
+ gboolean flush = GPOINTER_TO_INT (user_data);
+
+ if (flush) {
+ g_mutex_lock (&sctpenc_pad->lock);
+ sctpenc_pad->flushing = TRUE;
+ g_cond_signal (&sctpenc_pad->cond);
+ g_mutex_unlock (&sctpenc_pad->lock);
+ } else {
+ sctpenc_pad->flushing = FALSE;
+ }
+}
+
+static gboolean
+gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstSctpEnc *self = GST_SCTP_ENC (parent);
+ gboolean ret;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:{
+ GstIterator *it;
+
+ gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
+ gst_data_queue_flush (self->outbound_sctp_packet_queue);
+
+ it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_sinkpad,
+ GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ case GST_EVENT_RECONFIGURE:
+ case GST_EVENT_FLUSH_STOP:{
+ GstIterator *it;
+
+ it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_sinkpad,
+ GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
+ self->need_segment = TRUE;
+ gst_pad_start_task (self->src_pad,
+ (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
+
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ default:
+ ret = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ return ret;
+}
+
+static gboolean
+configure_association (GstSctpEnc * self)
+{
+ gint state;
+
+ self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
+ GST_WARNING_OBJECT (self,
+ "Could not configure SCTP association. Association already in use!");
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+ goto error;
+ }
+
+ self->signal_handler_state_changed =
+ g_signal_connect_object (self->sctp_association, "notify::state",
+ G_CALLBACK (on_sctp_association_state_changed), self, 0);
+
+ g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
+ "remote-port", G_BINDING_SYNC_CREATE);
+
+ g_object_bind_property (self, "use-sock-stream", self->sctp_association,
+ "use-sock-stream", G_BINDING_SYNC_CREATE);
+
+ gst_sctp_association_set_on_packet_out (self->sctp_association,
+ on_sctp_packet_out, self);
+
+ return TRUE;
+error:
+ return FALSE;
+}
+
+static void
+on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
+ GParamSpec * pspec, GstSctpEnc * self)
+{
+ gint state;
+
+ g_object_get (sctp_association, "state", &state, NULL);
+ switch (state) {
+ case GST_SCTP_ASSOCIATION_STATE_NEW:
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_READY:
+ gst_sctp_association_start (sctp_association);
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
+ g_signal_emit_by_name (self, "sctp-association-established", TRUE);
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
+ g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
+ FALSE);
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
+ break;
+ case GST_SCTP_ASSOCIATION_STATE_ERROR:
+ break;
+ }
+}
+
+static void
+data_queue_item_free (GstDataQueueItem * item)
+{
+ if (item->object)
+ gst_mini_object_unref (item->object);
+ g_free (item);
+}
+
+static void
+on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
+ gsize length, gpointer user_data)
+{
+ GstSctpEnc *self = user_data;
+ GstBuffer *gstbuf;
+ GstDataQueueItem *item;
+ GList *pending_pads, *l;
+ GstSctpEncPad *sctpenc_pad;
+
+ gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length);
+
+ item = g_new0 (GstDataQueueItem, 1);
+ item->object = GST_MINI_OBJECT (gstbuf);
+ item->size = length;
+ item->visible = TRUE;
+ item->destroy = (GDestroyNotify) data_queue_item_free;
+
+ if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
+ item->destroy (item);
+ GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
+ }
+
+ /* Wake up pads in the order they waited, oldest pad first */
+ GST_OBJECT_LOCK (self);
+ pending_pads = NULL;
+ while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
+ pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
+ }
+ GST_OBJECT_UNLOCK (self);
+
+ for (l = pending_pads; l; l = l->next) {
+ sctpenc_pad = l->data;
+ g_mutex_lock (&sctpenc_pad->lock);
+ g_cond_signal (&sctpenc_pad->cond);
+ g_mutex_unlock (&sctpenc_pad->lock);
+ }
+ g_list_free (pending_pads);
+}
+
+static void
+stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
+{
+ gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
+ gst_data_queue_flush (self->outbound_sctp_packet_queue);
+ gst_pad_stop_task (pad);
+}
+
+static void
+remove_sinkpad (const GValue * item, gpointer user_data)
+{
+ GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
+ GstSctpEnc *self = user_data;
+
+ gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
+}
+
+static void
+sctpenc_cleanup (GstSctpEnc * self)
+{
+ GstIterator *it;
+
+ gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL);
+ g_signal_handler_disconnect (self->sctp_association,
+ self->signal_handler_state_changed);
+ stop_srcpad_task (self->src_pad, self);
+ gst_sctp_association_force_close (self->sctp_association);
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+
+ it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+ g_queue_clear (&self->pending_pads);
+}
+
+static void
+get_config_from_caps (const GstCaps * caps, gboolean * ordered,
+ GstSctpAssociationPartialReliability * reliability,
+ guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
+{
+ GstStructure *s;
+ guint i, n;
+
+ *ordered = TRUE;
+ *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
+ *reliability_param = 0;
+ *ppid_available = FALSE;
+
+ n = gst_caps_get_size (caps);
+ for (i = 0; i < n; i++) {
+ s = gst_caps_get_structure (caps, i);
+ if (gst_structure_has_field (s, "ordered")) {
+ const GValue *v = gst_structure_get_value (s, "ordered");
+ *ordered = g_value_get_boolean (v);
+ }
+ if (gst_structure_has_field (s, "partially-reliability")) {
+ const GValue *v = gst_structure_get_value (s, "partially-reliability");
+ const gchar *reliability_string = g_value_get_string (v);
+
+ if (!g_strcmp0 (reliability_string, "none"))
+ *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
+ else if (!g_strcmp0 (reliability_string, "ttl"))
+ *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
+ else if (!g_strcmp0 (reliability_string, "buf"))
+ *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
+ else if (!g_strcmp0 (reliability_string, "rtx"))
+ *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
+ }
+ if (gst_structure_has_field (s, "reliability-parameter")) {
+ const GValue *v = gst_structure_get_value (s, "reliability-parameter");
+ *reliability_param = g_value_get_uint (v);
+ }
+ if (gst_structure_has_field (s, "ppid")) {
+ const GValue *v = gst_structure_get_value (s, "ppid");
+ *ppid = g_value_get_uint (v);
+ *ppid_available = TRUE;
+ }
+ }
+}
+
+static guint64
+on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
+{
+ gchar *pad_name;
+ GstPad *pad;
+ GstSctpEncPad *sctpenc_pad;
+ guint64 bytes_sent;
+
+ pad_name = g_strdup_printf ("sink_%u", stream_id);
+ pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
+ g_free (pad_name);
+
+ if (!pad) {
+ GST_DEBUG_OBJECT (self,
+ "Buffered amount requested on a stream that does not exist!");
+ return 0;
+ }
+
+ sctpenc_pad = GST_SCTP_ENC_PAD (pad);
+
+ g_mutex_lock (&sctpenc_pad->lock);
+ bytes_sent = sctpenc_pad->bytes_sent;
+ g_mutex_unlock (&sctpenc_pad->lock);
+
+ gst_object_unref (sctpenc_pad);
+
+ return bytes_sent;
+}
diff --git a/ext/sctp/gstsctpenc.h b/ext/sctp/gstsctpenc.h
new file mode 100644
index 000000000..a05bd4e8d
--- /dev/null
+++ b/ext/sctp/gstsctpenc.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifndef __GST_SCTP_ENC_H__
+#define __GST_SCTP_ENC_H__
+
+#include <gst/gst.h>
+#include <gst/base/base.h>
+#include "sctpassociation.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SCTP_ENC (gst_sctp_enc_get_type())
+#define GST_SCTP_ENC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC, GstSctpEnc))
+#define GST_SCTP_ENC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC, GstSctpEncClass))
+#define GST_IS_SCTP_ENC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC))
+#define GST_IS_SCTP_ENC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC))
+typedef struct _GstSctpEnc GstSctpEnc;
+typedef struct _GstSctpEncClass GstSctpEncClass;
+typedef struct _GstSctpEncPrivate GstSctpEncPrivate;
+
+struct _GstSctpEnc
+{
+ GstElement element;
+
+ GstPad *src_pad;
+ gboolean need_stream_start_caps, need_segment;
+ guint32 sctp_association_id;
+ guint16 remote_sctp_port;
+ gboolean use_sock_stream;
+
+ GstSctpAssociation *sctp_association;
+ GstDataQueue *outbound_sctp_packet_queue;
+
+ GQueue pending_pads;
+
+ gulong signal_handler_state_changed;
+};
+
+struct _GstSctpEncClass
+{
+ GstElementClass parent_class;
+
+ void (*on_sctp_association_is_established) (GstSctpEnc * sctp_enc,
+ gboolean established);
+ guint64 (*on_get_stream_bytes_sent) (GstSctpEnc * sctp_enc,
+ guint stream_id);
+
+};
+
+GType gst_sctp_enc_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SCTP_ENC_H__ */
diff --git a/ext/sctp/gstsctpplugin.c b/ext/sctp/gstsctpplugin.c
new file mode 100644
index 000000000..888a94c84
--- /dev/null
+++ b/ext/sctp/gstsctpplugin.c
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstsctpdec.h"
+#include "gstsctpenc.h"
+
+#include <gst/gst.h>
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ return gst_element_register (plugin, "sctpenc", GST_RANK_NONE,
+ GST_TYPE_SCTP_ENC)
+ && gst_element_register (plugin, "sctpdec", GST_RANK_NONE,
+ GST_TYPE_SCTP_DEC);
+}
+
+
+#ifndef PACKAGE
+#define PACKAGE "sctp"
+#endif
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ sctp,
+ "SCTP encoder/decoder plugin",
+ plugin_init, PACKAGE_VERSION, "BSD", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/ext/sctp/sctpassociation.c b/ext/sctp/sctpassociation.c
new file mode 100644
index 000000000..b7c686426
--- /dev/null
+++ b/ext/sctp/sctpassociation.c
@@ -0,0 +1,836 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "sctpassociation.h"
+
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+
+#define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type())
+static GType
+gst_sctp_association_state_get_type (void)
+{
+ static const GEnumValue values[] = {
+ {GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"},
+ {GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"},
+ {GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting",
+ "state-connecting"},
+ {GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected",
+ "state-connected"},
+ {GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting",
+ "state-disconnecting"},
+ {GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected",
+ "state-disconnected"},
+ {GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"},
+ {0, NULL, NULL}
+ };
+ static volatile GType id = 0;
+
+ if (g_once_init_enter ((gsize *) & id)) {
+ GType _id;
+ _id = g_enum_register_static ("GstSctpAssociationState", values);
+ g_once_init_leave ((gsize *) & id, _id);
+ }
+
+ return id;
+}
+
+G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT);
+
+enum
+{
+ SIGNAL_STREAM_RESET,
+ LAST_SIGNAL
+};
+
+
+enum
+{
+ PROP_0,
+
+ PROP_ASSOCIATION_ID,
+ PROP_LOCAL_PORT,
+ PROP_REMOTE_PORT,
+ PROP_STATE,
+ PROP_USE_SOCK_STREAM,
+
+ NUM_PROPERTIES
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+static GParamSpec *properties[NUM_PROPERTIES];
+
+#define DEFAULT_NUMBER_OF_SCTP_STREAMS 10
+#define DEFAULT_LOCAL_SCTP_PORT 0
+#define DEFAULT_REMOTE_SCTP_PORT 0
+
+static GHashTable *associations = NULL;
+G_LOCK_DEFINE_STATIC (associations_lock);
+static guint32 number_of_associations = 0;
+
+/* Interface implementations */
+static void gst_sctp_association_finalize (GObject * object);
+static void gst_sctp_association_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_sctp_association_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static struct socket *create_sctp_socket (GstSctpAssociation *
+ gst_sctp_association);
+static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation *
+ gst_sctp_association, guint16 port);
+static gpointer connection_thread_func (GstSctpAssociation * self);
+static gboolean client_role_connect (GstSctpAssociation * self);
+static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
+ guint8 set_df);
+static int receive_cb (struct socket *sock, union sctp_sockstore addr,
+ void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags,
+ void *ulp_info);
+static void handle_notification (GstSctpAssociation * self,
+ const union sctp_notification *notification, size_t length);
+static void handle_association_changed (GstSctpAssociation * self,
+ const struct sctp_assoc_change *sac);
+static void handle_stream_reset_event (GstSctpAssociation * self,
+ const struct sctp_stream_reset_event *ssr);
+static void handle_message (GstSctpAssociation * self, guint8 * data,
+ guint32 datalen, guint16 stream_id, guint32 ppid);
+
+static void maybe_set_state_to_ready (GstSctpAssociation * self);
+static void gst_sctp_association_change_state (GstSctpAssociation * self,
+ GstSctpAssociationState new_state, gboolean notify);
+
+static void
+gst_sctp_association_class_init (GstSctpAssociationClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = (GObjectClass *) klass;
+
+ gobject_class->finalize = gst_sctp_association_finalize;
+ gobject_class->set_property = gst_sctp_association_set_property;
+ gobject_class->get_property = gst_sctp_association_get_property;
+
+ signals[SIGNAL_STREAM_RESET] =
+ g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass),
+ G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass,
+ on_sctp_stream_reset), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id",
+ "The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT,
+ DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP",
+ "The local SCTP port for this association", 0, G_MAXUSHORT,
+ DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_REMOTE_PORT] =
+ g_param_spec_uint ("remote-port", "Remote SCTP",
+ "The remote SCTP port for this association", 0, G_MAXUSHORT,
+ DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state",
+ "The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE,
+ GST_SCTP_ASSOCIATION_STATE_NEW,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_USE_SOCK_STREAM] =
+ g_param_spec_boolean ("use-sock-stream", "Use sock-stream",
+ "When set to TRUE, a sequenced, reliable, connection-based connection is used."
+ "When TRUE the partial reliability parameters of the channel is ignored.",
+ FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+}
+
+static void
+gst_sctp_association_init (GstSctpAssociation * self)
+{
+ /* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */
+ if (number_of_associations == 0) {
+ usrsctp_init (0, sctp_packet_out, g_print);
+
+ /* Explicit Congestion Notification */
+ usrsctp_sysctl_set_sctp_ecn_enable (0);
+
+ usrsctp_sysctl_set_sctp_nr_outgoing_streams_default
+ (DEFAULT_NUMBER_OF_SCTP_STREAMS);
+ }
+ number_of_associations++;
+
+ self->local_port = DEFAULT_LOCAL_SCTP_PORT;
+ self->remote_port = DEFAULT_REMOTE_SCTP_PORT;
+ self->sctp_ass_sock = NULL;
+
+ self->connection_thread = NULL;
+ g_mutex_init (&self->association_mutex);
+
+ self->state = GST_SCTP_ASSOCIATION_STATE_NEW;
+
+ self->use_sock_stream = FALSE;
+
+ usrsctp_register_address ((void *) self);
+}
+
+static void
+gst_sctp_association_finalize (GObject * object)
+{
+ GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
+
+ G_LOCK (associations_lock);
+
+ g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id));
+
+ usrsctp_deregister_address ((void *) self);
+ number_of_associations--;
+ if (number_of_associations == 0) {
+ usrsctp_finish ();
+ }
+ G_UNLOCK (associations_lock);
+
+ g_thread_join (self->connection_thread);
+
+ G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object);
+}
+
+static void
+gst_sctp_association_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
+
+ g_mutex_lock (&self->association_mutex);
+ if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) {
+ switch (prop_id) {
+ case PROP_LOCAL_PORT:
+ case PROP_REMOTE_PORT:
+ g_warning ("These properties cannot be set in this state");
+ goto error;
+ }
+ }
+
+ switch (prop_id) {
+ case PROP_ASSOCIATION_ID:
+ self->association_id = g_value_get_uint (value);
+ break;
+ case PROP_LOCAL_PORT:
+ self->local_port = g_value_get_uint (value);
+ break;
+ case PROP_REMOTE_PORT:
+ self->remote_port = g_value_get_uint (value);
+ break;
+ case PROP_STATE:
+ self->state = g_value_get_enum (value);
+ break;
+ case PROP_USE_SOCK_STREAM:
+ self->use_sock_stream = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+
+ g_mutex_unlock (&self->association_mutex);
+ if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT)
+ maybe_set_state_to_ready (self);
+
+ return;
+
+error:
+ g_mutex_unlock (&self->association_mutex);
+}
+
+static void
+maybe_set_state_to_ready (GstSctpAssociation * self)
+{
+ gboolean signal_ready_state = FALSE;
+
+ g_mutex_lock (&self->association_mutex);
+ if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) &&
+ (self->local_port != 0 && self->remote_port != 0)
+ && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) {
+ signal_ready_state = TRUE;
+ gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY,
+ FALSE);
+ }
+ g_mutex_unlock (&self->association_mutex);
+
+ /* The reason the state is changed twice is that we do not want to change state with
+ * notification while the association_mutex is locked. If someone listens
+ * on property change and call this object a deadlock might occur.*/
+ if (signal_ready_state)
+ gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY,
+ TRUE);
+
+}
+
+static void
+gst_sctp_association_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
+
+ switch (prop_id) {
+ case PROP_ASSOCIATION_ID:
+ g_value_set_uint (value, self->association_id);
+ break;
+ case PROP_LOCAL_PORT:
+ g_value_set_uint (value, self->local_port);
+ break;
+ case PROP_REMOTE_PORT:
+ g_value_set_uint (value, self->remote_port);
+ break;
+ case PROP_STATE:
+ g_value_set_enum (value, self->state);
+ break;
+ case PROP_USE_SOCK_STREAM:
+ g_value_set_boolean (value, self->use_sock_stream);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+/* Public functions */
+
+GstSctpAssociation *
+gst_sctp_association_get (guint32 association_id)
+{
+ GstSctpAssociation *association;
+
+ G_LOCK (associations_lock);
+ if (!associations) {
+ associations =
+ g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL);
+ }
+
+ association =
+ g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id));
+ if (!association) {
+ association =
+ g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id",
+ association_id, NULL);
+ g_hash_table_insert (associations, GUINT_TO_POINTER (association_id),
+ association);
+ } else {
+ g_object_ref (association);
+ }
+ G_UNLOCK (associations_lock);
+ return association;
+}
+
+gboolean
+gst_sctp_association_start (GstSctpAssociation * self)
+{
+ gchar *thread_name;
+
+ g_mutex_lock (&self->association_mutex);
+ if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) {
+ g_warning ("SCTP association is in wrong state and cannot be started");
+ goto configure_required;
+ }
+
+ if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL)
+ goto error;
+
+ gst_sctp_association_change_state (self,
+ GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE);
+ g_mutex_unlock (&self->association_mutex);
+
+ /* The reason the state is changed twice is that we do not want to change state with
+ * notification while the association_mutex is locked. If someone listens
+ * on property change and call this object a deadlock might occur.*/
+ gst_sctp_association_change_state (self,
+ GST_SCTP_ASSOCIATION_STATE_CONNECTING, TRUE);
+
+ thread_name = g_strdup_printf ("connection_thread_%u", self->association_id);
+ self->connection_thread = g_thread_new (thread_name,
+ (GThreadFunc) connection_thread_func, self);
+ g_free (thread_name);
+
+ return TRUE;
+error:
+ g_mutex_unlock (&self->association_mutex);
+ gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
+ TRUE);
+configure_required:
+ g_mutex_unlock (&self->association_mutex);
+ return FALSE;
+}
+
+void
+gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
+ GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data)
+{
+ g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
+
+ g_mutex_lock (&self->association_mutex);
+ if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) {
+ self->packet_out_cb = packet_out_cb;
+ self->packet_out_user_data = user_data;
+ } else {
+ /* This is to be thread safe. The Association might try to write to the closure already */
+ g_warning ("It is not possible to change packet callback in this state");
+ }
+ g_mutex_unlock (&self->association_mutex);
+
+ maybe_set_state_to_ready (self);
+}
+
+void
+gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
+ GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data)
+{
+ g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
+
+ g_mutex_lock (&self->association_mutex);
+ if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) {
+ self->packet_received_cb = packet_received_cb;
+ self->packet_received_user_data = user_data;
+ } else {
+ /* This is to be thread safe. The Association might try to write to the closure already */
+ g_warning ("It is not possible to change receive callback in this state");
+ }
+ g_mutex_unlock (&self->association_mutex);
+
+ maybe_set_state_to_ready (self);
+}
+
+void
+gst_sctp_association_incoming_packet (GstSctpAssociation * self, guint8 * buf,
+ guint32 length)
+{
+ usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
+}
+
+gboolean
+gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf,
+ guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
+ GstSctpAssociationPartialReliability pr, guint32 reliability_param)
+{
+ struct sctp_sendv_spa spa;
+ gint32 bytes_sent;
+ gboolean result = FALSE;
+ struct sockaddr_conn remote_addr;
+
+ g_mutex_lock (&self->association_mutex);
+ if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED)
+ goto end;
+
+ memset (&spa, 0, sizeof (spa));
+
+ spa.sendv_sndinfo.snd_ppid = g_htonl (ppid);
+ spa.sendv_sndinfo.snd_sid = stream_id;
+ spa.sendv_sndinfo.snd_flags = ordered ? 0 : SCTP_UNORDERED;
+ spa.sendv_sndinfo.snd_context = 0;
+ spa.sendv_sndinfo.snd_assoc_id = 0;
+ spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
+ if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) {
+ spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
+ spa.sendv_prinfo.pr_value = g_htonl (reliability_param);
+ if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL)
+ spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
+ else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX)
+ spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
+ else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF)
+ spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF;
+ }
+
+ remote_addr = get_sctp_socket_address (self, self->remote_port);
+ bytes_sent =
+ usrsctp_sendv (self->sctp_ass_sock, buf, length,
+ (struct sockaddr *) &remote_addr, 1, (void *) &spa,
+ (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
+ if (bytes_sent < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* Resending this buffer is taken care of by the gstsctpenc */
+ goto end;
+ } else {
+ g_warning ("Error sending data on stream %u: (%u) %s", stream_id, errno,
+ strerror (errno));
+ goto end;
+ }
+ }
+
+ result = TRUE;
+end:
+ g_mutex_unlock (&self->association_mutex);
+ return result;
+}
+
+
+void
+gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
+{
+ struct sctp_reset_streams *srs;
+ socklen_t length;
+
+ length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16));
+ srs = (struct sctp_reset_streams *) g_malloc0 (length);
+ srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
+ srs->srs_number_streams = 1;
+ srs->srs_stream_list[0] = stream_id;
+
+ g_mutex_lock (&self->association_mutex);
+ usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS,
+ srs, length);
+ g_mutex_unlock (&self->association_mutex);
+
+ g_free (srs);
+}
+
+void
+gst_sctp_association_force_close (GstSctpAssociation * self)
+{
+ g_mutex_lock (&self->association_mutex);
+ if (self->sctp_ass_sock) {
+ usrsctp_close (self->sctp_ass_sock);
+ self->sctp_ass_sock = NULL;
+
+ }
+ g_mutex_unlock (&self->association_mutex);
+}
+
+static struct socket *
+create_sctp_socket (GstSctpAssociation * self)
+{
+ struct socket *sock;
+ struct linger l;
+ struct sctp_event event;
+ struct sctp_assoc_value stream_reset;
+ int value = 1;
+ guint16 event_types[] = {
+ SCTP_ASSOC_CHANGE,
+ SCTP_PEER_ADDR_CHANGE,
+ SCTP_REMOTE_ERROR,
+ SCTP_SEND_FAILED,
+ SCTP_SHUTDOWN_EVENT,
+ SCTP_ADAPTATION_INDICATION,
+ /*SCTP_PARTIAL_DELIVERY_EVENT, */
+ /*SCTP_AUTHENTICATION_EVENT, */
+ SCTP_STREAM_RESET_EVENT,
+ /*SCTP_SENDER_DRY_EVENT, */
+ /*SCTP_NOTIFICATIONS_STOPPED_EVENT, */
+ /*SCTP_ASSOC_RESET_EVENT, */
+ SCTP_STREAM_CHANGE_EVENT
+ };
+ guint32 i;
+ guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET;
+
+ if ((sock =
+ usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0,
+ (void *) self)) == NULL)
+ goto error;
+
+ if (usrsctp_set_non_blocking (sock, 1) < 0) {
+ g_warning ("Could not set non-blocking mode on SCTP socket");
+ goto error;
+ }
+
+ memset (&l, 0, sizeof (l));
+ l.l_onoff = 1;
+ l.l_linger = 0;
+ if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l,
+ (socklen_t) sizeof (struct linger)) < 0) {
+ g_warning ("Could not set SO_LINGER on SCTP socket");
+ goto error;
+ }
+
+ if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value,
+ sizeof (int))) {
+ g_warning ("Could not set SCTP_NODELAY");
+ goto error;
+ }
+
+ memset (&stream_reset, 0, sizeof (stream_reset));
+ stream_reset.assoc_id = SCTP_ALL_ASSOC;
+ stream_reset.assoc_value = 1;
+ if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
+ &stream_reset, sizeof (stream_reset))) {
+ g_warning ("Could not set SCTP_ENABLE_STREAM_RESET");
+ goto error;
+ }
+
+ memset (&event, 0, sizeof (event));
+ event.se_assoc_id = SCTP_ALL_ASSOC;
+ event.se_on = 1;
+ for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) {
+ event.se_type = event_types[i];
+ if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT,
+ &event, sizeof (event)) < 0) {
+ g_warning ("Failed to register event %u", event_types[i]);
+ }
+ }
+
+ return sock;
+error:
+ if (sock) {
+ usrsctp_close (sock);
+ g_warning ("Could not create socket. Error: (%u) %s", errno,
+ strerror (errno));
+ errno = 0;
+ sock = NULL;
+ }
+ return NULL;
+}
+
+static struct sockaddr_conn
+get_sctp_socket_address (GstSctpAssociation * gst_sctp_association,
+ guint16 port)
+{
+ struct sockaddr_conn addr;
+
+ memset ((void *) &addr, 0, sizeof (struct sockaddr_conn));
+#ifdef __APPLE__
+ addr.sconn_len = sizeof (struct sockaddr_conn);
+#endif
+ addr.sconn_family = AF_CONN;
+ addr.sconn_port = g_htons (port);
+ addr.sconn_addr = (void *) gst_sctp_association;
+
+ return addr;
+}
+
+static gpointer
+connection_thread_func (GstSctpAssociation * self)
+{
+ /* TODO: Support both server and client role */
+ client_role_connect (self);
+ return NULL;
+}
+
+static gboolean
+client_role_connect (GstSctpAssociation * self)
+{
+ struct sockaddr_conn addr;
+ gint ret;
+
+ g_mutex_lock (&self->association_mutex);
+ addr = get_sctp_socket_address (self, self->local_port);
+ ret =
+ usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr,
+ sizeof (struct sockaddr_conn));
+ if (ret < 0) {
+ g_warning ("usrsctp_bind() error: (%u) %s", errno, strerror (errno));
+ goto error;
+ }
+
+ addr = get_sctp_socket_address (self, self->remote_port);
+ ret =
+ usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &addr,
+ sizeof (struct sockaddr_conn));
+ if (ret < 0 && errno != EINPROGRESS) {
+ g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno));
+ goto error;
+ }
+ g_mutex_unlock (&self->association_mutex);
+ return TRUE;
+error:
+ g_mutex_unlock (&self->association_mutex);
+ return FALSE;
+}
+
+static int
+sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
+ guint8 set_df)
+{
+ GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr);
+
+ if (self->packet_out_cb) {
+ self->packet_out_cb (self, buffer, length, self->packet_out_user_data);
+ }
+
+ return 0;
+}
+
+static int
+receive_cb (struct socket *sock, union sctp_sockstore addr, void *data,
+ size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info)
+{
+ GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info);
+
+ if (!data) {
+ /* Not sure if this can happend. */
+ g_warning ("Received empty data buffer");
+ } else {
+ if (flags & MSG_NOTIFICATION) {
+ handle_notification (self, (const union sctp_notification *) data,
+ datalen);
+ free (data);
+ } else {
+ handle_message (self, data, datalen, rcv_info.rcv_sid,
+ ntohl (rcv_info.rcv_ppid));
+ }
+ }
+
+ return 1;
+}
+
+static void
+handle_notification (GstSctpAssociation * self,
+ const union sctp_notification *notification, size_t length)
+{
+ g_assert (notification->sn_header.sn_length == length);
+
+ switch (notification->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_CHANGE");
+ handle_association_changed (self, &notification->sn_assoc_change);
+ break;
+ case SCTP_PEER_ADDR_CHANGE:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_PEER_ADDR_CHANGE");
+ break;
+ case SCTP_REMOTE_ERROR:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_REMOTE_ERROR");
+ break;
+ case SCTP_SEND_FAILED:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED");
+ break;
+ case SCTP_SHUTDOWN_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SHUTDOWN_EVENT");
+ break;
+ case SCTP_ADAPTATION_INDICATION:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
+ "Event: SCTP_ADAPTATION_INDICATION");
+ break;
+ case SCTP_PARTIAL_DELIVERY_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
+ "Event: SCTP_PARTIAL_DELIVERY_EVENT");
+ break;
+ case SCTP_AUTHENTICATION_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
+ "Event: SCTP_AUTHENTICATION_EVENT");
+ break;
+ case SCTP_STREAM_RESET_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_RESET_EVENT");
+ handle_stream_reset_event (self, &notification->sn_strreset_event);
+ break;
+ case SCTP_SENDER_DRY_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SENDER_DRY_EVENT");
+ break;
+ case SCTP_NOTIFICATIONS_STOPPED_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
+ "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT");
+ break;
+ case SCTP_ASSOC_RESET_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_RESET_EVENT");
+ break;
+ case SCTP_STREAM_CHANGE_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_CHANGE_EVENT");
+ break;
+ case SCTP_SEND_FAILED_EVENT:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED_EVENT");
+ break;
+ default:
+ break;
+ }
+}
+
+static void
+handle_association_changed (GstSctpAssociation * self,
+ const struct sctp_assoc_change *sac)
+{
+ gboolean change_state = FALSE;
+ GstSctpAssociationState new_state;
+
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()");
+ g_mutex_lock (&self->association_mutex);
+ if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) {
+ change_state = TRUE;
+ new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED;
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP association connected!");
+ } else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
+ g_warning ("SCTP association already open");
+ } else {
+ g_warning ("SCTP association in unexpected state");
+ }
+ g_mutex_unlock (&self->association_mutex);
+ break;
+ case SCTP_COMM_LOST:
+ g_warning ("SCTP event SCTP_COMM_LOST received");
+ /* TODO: Tear down association and signal that this has happend */
+ break;
+ case SCTP_RESTART:
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
+ "SCTP event SCTP_RESTART received");
+ break;
+ case SCTP_SHUTDOWN_COMP:
+ g_warning ("SCTP event SCTP_SHUTDOWN_COMP received");
+ /* TODO: Tear down association and signal that this has happend */
+ break;
+ case SCTP_CANT_STR_ASSOC:
+ g_warning ("SCTP event SCTP_CANT_STR_ASSOC received");
+ break;
+ }
+
+ if (change_state)
+ gst_sctp_association_change_state (self, new_state, TRUE);
+}
+
+static void
+handle_stream_reset_event (GstSctpAssociation * self,
+ const struct sctp_stream_reset_event *sr)
+{
+ guint32 i, n;
+ if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
+ !(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) {
+ n = (sr->strreset_length -
+ sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t);
+ for (i = 0; i < n; i++) {
+ if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+ g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0,
+ sr->strreset_stream_list[i]);
+ }
+ }
+ }
+}
+
+static void
+handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen,
+ guint16 stream_id, guint32 ppid)
+{
+ if (self->packet_received_cb) {
+ self->packet_received_cb (self, data, datalen, stream_id, ppid,
+ self->packet_received_user_data);
+ }
+}
+
+static void
+gst_sctp_association_change_state (GstSctpAssociation * self,
+ GstSctpAssociationState new_state, gboolean notify)
+{
+ self->state = new_state;
+ if (notify)
+ g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
+}
diff --git a/ext/sctp/sctpassociation.h b/ext/sctp/sctpassociation.h
new file mode 100644
index 000000000..4510ba70f
--- /dev/null
+++ b/ext/sctp/sctpassociation.h
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifndef __GST_SCTP_ASSOCIATION_H__
+#define __GST_SCTP_ASSOCIATION_H__
+
+#include <glib-object.h>
+#define INET
+#define INET6
+#include <usrsctp.h>
+
+G_BEGIN_DECLS
+
+#define GST_SCTP_TYPE_ASSOCIATION (gst_sctp_association_get_type ())
+#define GST_SCTP_ASSOCIATION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociation))
+#define GST_SCTP_IS_ASSOCIATION(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_SCTP_TYPE_ASSOCIATION))
+#define GST_SCTP_ASSOCIATION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociationClass))
+#define GST_SCTP_IS_ASSOCIATION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_SCTP_TYPE_ASSOCIATION))
+#define GST_SCTP_ASSOCIATION_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociationClass))
+
+typedef struct _GstSctpAssociation GstSctpAssociation;
+typedef struct _GstSctpAssociationClass GstSctpAssociationClass;
+
+typedef enum
+{
+ GST_SCTP_ASSOCIATION_STATE_NEW,
+ GST_SCTP_ASSOCIATION_STATE_READY,
+ GST_SCTP_ASSOCIATION_STATE_CONNECTING,
+ GST_SCTP_ASSOCIATION_STATE_CONNECTED,
+ GST_SCTP_ASSOCIATION_STATE_DISCONNECTING,
+ GST_SCTP_ASSOCIATION_STATE_DISCONNECTED,
+ GST_SCTP_ASSOCIATION_STATE_ERROR
+} GstSctpAssociationState;
+
+typedef enum
+{
+ GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE = 0x0000,
+ GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL = 0x0001,
+ GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF = 0x0002,
+ GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX = 0x0003
+} GstSctpAssociationPartialReliability;
+
+typedef void (*GstSctpAssociationPacketReceivedCb) (GstSctpAssociation *
+ sctp_association, guint8 * data, gsize length, guint16 stream_id,
+ guint ppid, gpointer user_data);
+typedef void (*GstSctpAssociationPacketOutCb) (GstSctpAssociation *
+ sctp_association, const guint8 * data, gsize length, gpointer user_data);
+
+struct _GstSctpAssociation
+{
+ GObject parent_instance;
+
+ guint32 association_id;
+ guint16 local_port;
+ guint16 remote_port;
+ gboolean use_sock_stream;
+ struct socket *sctp_ass_sock;
+
+ GMutex association_mutex;
+
+ GstSctpAssociationState state;
+
+ GThread *connection_thread;
+
+ GstSctpAssociationPacketReceivedCb packet_received_cb;
+ gpointer packet_received_user_data;
+
+ GstSctpAssociationPacketOutCb packet_out_cb;
+ gpointer packet_out_user_data;
+};
+
+struct _GstSctpAssociationClass
+{
+ GObjectClass parent_class;
+
+ void (*on_sctp_stream_reset) (GstSctpAssociation * sctp_association,
+ guint16 stream_id);
+};
+
+GType gst_sctp_association_get_type (void);
+
+GstSctpAssociation *gst_sctp_association_get (guint32 association_id);
+
+gboolean gst_sctp_association_start (GstSctpAssociation * self);
+void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
+ GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data);
+void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
+ GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data);
+void gst_sctp_association_incoming_packet (GstSctpAssociation * self,
+ guint8 * buf, guint32 length);
+gboolean gst_sctp_association_send_data (GstSctpAssociation * self,
+ guint8 * buf, guint32 length, guint16 stream_id, guint32 ppid,
+ gboolean ordered, GstSctpAssociationPartialReliability pr,
+ guint32 reliability_param);
+void gst_sctp_association_reset_stream (GstSctpAssociation * self,
+ guint16 stream_id);
+void gst_sctp_association_force_close (GstSctpAssociation * self);
+
+G_END_DECLS
+
+#endif /* __GST_SCTP_ASSOCIATION_H__ */