diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/Makefile.am | 8 | ||||
-rw-r--r-- | ext/sctp/Makefile.am | 21 | ||||
-rw-r--r-- | ext/sctp/gstsctpdec.c | 655 | ||||
-rw-r--r-- | ext/sctp/gstsctpdec.h | 66 | ||||
-rw-r--r-- | ext/sctp/gstsctpenc.c | 937 | ||||
-rw-r--r-- | ext/sctp/gstsctpenc.h | 77 | ||||
-rw-r--r-- | ext/sctp/gstsctpplugin.c | 53 | ||||
-rw-r--r-- | ext/sctp/sctpassociation.c | 836 | ||||
-rw-r--r-- | ext/sctp/sctpassociation.h | 123 |
9 files changed, 2776 insertions, 0 deletions
diff --git a/ext/Makefile.am b/ext/Makefile.am index 45131153a..4a18dd869 100644 --- a/ext/Makefile.am +++ b/ext/Makefile.am @@ -286,6 +286,12 @@ else SBC_DIR= endif +if USE_SCTP +SCTP_DIR=sctp +else +SCTP_DIR= +endif + if USE_SMOOTHSTREAMING SMOOTHSTREAMING_DIR = smoothstreaming else @@ -455,6 +461,7 @@ SUBDIRS=\ $(OPUS_DIR) \ $(RSVG_DIR) \ $(SBC_DIR) \ + $(SCTP_DIR) \ $(SMOOTHSTREAMING_DIR) \ $(SMOOTHWAVE_DIR) \ $(SNDFILE_DIR) \ @@ -523,6 +530,7 @@ DIST_SUBDIRS = \ rsvg \ resindvd \ sbc \ + sctp \ smoothstreaming \ sndfile \ soundtouch \ diff --git a/ext/sctp/Makefile.am b/ext/sctp/Makefile.am new file mode 100644 index 000000000..7f535ee69 --- /dev/null +++ b/ext/sctp/Makefile.am @@ -0,0 +1,21 @@ +plugin_LTLIBRARIES = libgstsctp.la + +libgstsctp_la_SOURCES = \ + gstsctpplugin.c \ + sctpassociation.c \ + gstsctpenc.c \ + gstsctpdec.c + +libgstsctp_la_CFLAGS = \ + $(GST_PLUGINS_BASE_CFLAGS) \ + $(GST_BASE_CFLAGS) \ + $(GST_CFLAGS) \ + $(USRSCTP_CFLAGS) \ + -I$(top_srcdir)/gst-libs + +libgstsctp_la_LIBADD = $(GST_LIBS) $(GST_BASE_LIBS) $(USRSCTP_LIBS) $(top_builddir)/gst-libs/gst/sctp/libgstsctp-1.0.la + +noinst_HEADERS = \ + sctpassociation.h \ + gstsctpenc.h \ + gstsctpdec.h diff --git a/ext/sctp/gstsctpdec.c b/ext/sctp/gstsctpdec.c new file mode 100644 index 000000000..ccc5ec12f --- /dev/null +++ b/ext/sctp/gstsctpdec.c @@ -0,0 +1,655 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "gstsctpdec.h" + +#include <gst/sctp/sctpreceivemeta.h> +#include <gst/base/gstdataqueue.h> + +#include <stdio.h> +#include <stdlib.h> + +GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category); +#define GST_CAT_DEFAULT gst_sctp_dec_debug_category + +#define gst_sctp_dec_parent_class parent_class +G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT); + +static GstStaticPadTemplate sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, + GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp")); + +static GstStaticPadTemplate src_template = +GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, + GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); + +enum +{ + SIGNAL_RESET_STREAM, + NUM_SIGNALS +}; + +static guint signals[NUM_SIGNALS]; + +enum +{ + PROP_0, + + PROP_GST_SCTP_ASSOCIATION_ID, + PROP_LOCAL_SCTP_PORT, + + NUM_PROPERTIES +}; + +static GParamSpec *properties[NUM_PROPERTIES]; + +#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1 +#define DEFAULT_LOCAL_SCTP_PORT 0 +#define MAX_SCTP_PORT 65535 +#define MAX_GST_SCTP_ASSOCIATION_ID 65535 +#define MAX_STREAM_ID 65535 + +GType gst_sctp_dec_pad_get_type (void); + +#define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type()) +#define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad)) +#define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass)) +#define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD)) +#define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD)) + +typedef struct _GstSctpDecPad GstSctpDecPad; +typedef GstPadClass GstSctpDecPadClass; + +struct _GstSctpDecPad +{ + GstPad parent; + + GstDataQueue *packet_queue; +}; + +G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD); + +static void +gst_sctp_dec_pad_finalize (GObject * object) +{ + GstSctpDecPad *self = GST_SCTP_DEC_PAD (object); + + gst_object_unref (self->packet_queue); + + G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object); +} + +static gboolean +data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes, + guint64 time, gpointer user_data) +{ + /* FIXME: Are we full at some point and block? */ + return FALSE; +} + +static void +data_queue_empty_cb (GstDataQueue * queue, gpointer user_data) +{ +} + +static void +data_queue_full_cb (GstDataQueue * queue, gpointer user_data) +{ +} + +static void +gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->finalize = gst_sctp_dec_pad_finalize; +} + +static void +gst_sctp_dec_pad_init (GstSctpDecPad * self) +{ + self->packet_queue = gst_data_queue_new (data_queue_check_full_cb, + data_queue_full_cb, data_queue_empty_cb, NULL); +} + +static void gst_sctp_dec_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_sctp_dec_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element, + GstStateChange transition); +static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, + GstBuffer * buf); +static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, + GstEvent * event); +static void gst_sctp_data_srcpad_loop (GstPad * pad); + +static gboolean configure_association (GstSctpDec * self); +static void on_gst_sctp_association_stream_reset (GstSctpAssociation * + gst_sctp_association, guint16 stream_id, GstSctpDec * self); +static void on_receive (GstSctpAssociation * gst_sctp_association, guint8 * buf, + gsize length, guint16 stream_id, guint ppid, gpointer user_data); +static void stop_srcpad_task (GstPad * pad); +static void stop_all_srcpad_tasks (GstSctpDec * self); +static void sctpdec_cleanup (GstSctpDec * self); +static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id); +static void remove_pad (GstElement * element, GstPad * pad); +static void on_reset_stream (GstSctpDec * self, guint stream_id); + +static void +gst_sctp_dec_class_init (GstSctpDecClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *element_class; + + gobject_class = G_OBJECT_CLASS (klass); + element_class = GST_ELEMENT_CLASS (klass); + + GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category, + "sctpdec", 0, "debug category for sctpdec element"); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sink_template)); + + gobject_class->set_property = gst_sctp_dec_set_property; + gobject_class->get_property = gst_sctp_dec_get_property; + + element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state); + + klass->on_reset_stream = on_reset_stream; + + properties[PROP_GST_SCTP_ASSOCIATION_ID] = + g_param_spec_uint ("sctp-association-id", + "SCTP Association ID", + "Every encoder/decoder pair should have the same, unique, sctp-association-id. " + "This value must be set before any pads are requested.", + 0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_LOCAL_SCTP_PORT] = + g_param_spec_uint ("local-sctp-port", + "Local SCTP port", + "Local sctp port for the sctp association. The remote port is configured via the " + "GstSctpEnc element.", + 0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties); + + signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream", + G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT); + + gst_element_class_set_static_metadata (element_class, + "SCTP Decoder", + "Decoder/Network/SCTP", + "Decodes packets with SCTP", + "George Kiagiadakis <george.kiagiadakis@collabora.com>"); +} + +static void +gst_sctp_dec_init (GstSctpDec * self) +{ + self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID; + self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT; + + self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink"); + gst_pad_set_chain_function (self->sink_pad, + GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain)); + gst_pad_set_event_function (self->sink_pad, + GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event)); + + gst_element_add_pad (GST_ELEMENT (self), self->sink_pad); +} + +static void +gst_sctp_dec_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSctpDec *self = GST_SCTP_DEC (object); + + switch (prop_id) { + case PROP_GST_SCTP_ASSOCIATION_ID: + self->sctp_association_id = g_value_get_uint (value); + break; + case PROP_LOCAL_SCTP_PORT: + self->local_sctp_port = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } +} + +static void +gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstSctpDec *self = GST_SCTP_DEC (object); + + switch (prop_id) { + case PROP_GST_SCTP_ASSOCIATION_ID: + g_value_set_uint (value, self->sctp_association_id); + break; + case PROP_LOCAL_SCTP_PORT: + g_value_set_uint (value, self->local_sctp_port); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_sctp_dec_change_state (GstElement * element, GstStateChange transition) +{ + GstSctpDec *self = GST_SCTP_DEC (element); + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (!configure_association (self)) + ret = GST_STATE_CHANGE_FAILURE; + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + sctpdec_cleanup (self); + break; + default: + break; + } + + if (ret != GST_STATE_CHANGE_FAILURE) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + return ret; +} + +static GstFlowReturn +gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf) +{ + GstMapInfo map; + + if (!gst_buffer_map (buf, &map, GST_MAP_READ)) { + GST_WARNING_OBJECT (self, "Could not map GstBuffer"); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } + + gst_sctp_association_incoming_packet (self->sctp_association, + (guint8 *) map.data, (guint32) map.size); + gst_buffer_unmap (buf, &map); + gst_buffer_unref (buf); + + return GST_FLOW_OK; +} + +static void +flush_srcpad (const GValue * item, gpointer user_data) +{ + GstSctpDecPad *sctpdec_pad = g_value_get_object (item); + gboolean flush = GPOINTER_TO_INT (user_data); + + if (flush) { + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE); + gst_data_queue_flush (sctpdec_pad->packet_queue); + } else { + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE); + gst_pad_start_task (GST_PAD (sctpdec_pad), + (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL); + } +} + +static gboolean +gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event) +{ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_STREAM_START: + case GST_EVENT_CAPS: + /* We create our own stream-start events and the caps event does not + * make sense */ + gst_event_unref (event); + return TRUE; + case GST_EVENT_EOS: + /* Drop this, we're never EOS until shut down */ + gst_event_unref (event); + return TRUE; + case GST_EVENT_FLUSH_START:{ + GstIterator *it; + + it = gst_element_iterate_src_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, flush_srcpad, + GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); + + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } + case GST_EVENT_FLUSH_STOP:{ + GstIterator *it; + + it = gst_element_iterate_src_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, flush_srcpad, + GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); + + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } + default: + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } +} + +static void +gst_sctp_data_srcpad_loop (GstPad * pad) +{ + GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad); + GstDataQueueItem *item; + + if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) { + GstFlowReturn flow_ret; + + flow_ret = gst_pad_push (pad, GST_BUFFER (item->object)); + item->object = NULL; + if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING + || flow_ret == GST_FLOW_NOT_LINKED)) { + GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s", + gst_flow_get_name (flow_ret)); + } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) { + GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s", + gst_flow_get_name (flow_ret)); + } + + if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) { + GST_DEBUG_OBJECT (pad, "Pausing task because of an error"); + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE); + gst_data_queue_flush (sctpdec_pad->packet_queue); + gst_pad_pause_task (pad); + } + + item->destroy (item); + } else { + GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing"); + gst_pad_pause_task (pad); + } +} + +static gboolean +configure_association (GstSctpDec * self) +{ + gint state; + + self->sctp_association = gst_sctp_association_get (self->sctp_association_id); + + g_object_get (self->sctp_association, "state", &state, NULL); + + if (state != GST_SCTP_ASSOCIATION_STATE_NEW) { + GST_WARNING_OBJECT (self, + "Could not configure SCTP association. Association already in use!"); + g_object_unref (self->sctp_association); + self->sctp_association = NULL; + goto error; + } + + self->signal_handler_stream_reset = + g_signal_connect_object (self->sctp_association, "stream-reset", + G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0); + + g_object_bind_property (self, "local-sctp-port", self->sctp_association, + "local-port", G_BINDING_SYNC_CREATE); + + gst_sctp_association_set_on_packet_received (self->sctp_association, + on_receive, self); + + return TRUE; +error: + return FALSE; +} + +static gboolean +gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event) +{ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_RECONFIGURE: + case GST_EVENT_FLUSH_STOP:{ + GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad); + + /* Unflush and start task again */ + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE); + gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad, + NULL); + + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } + case GST_EVENT_FLUSH_START:{ + GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad); + + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE); + gst_data_queue_flush (sctpdec_pad->packet_queue); + + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } + default: + return gst_pad_event_default (pad, GST_OBJECT (self), event); + } +} + +static gboolean +copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) +{ + GstPad *new_pad = user_data; + + if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS + && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START) + gst_pad_store_sticky_event (new_pad, *event); + + return TRUE; +} + +static GstPad * +get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id) +{ + GstPad *new_pad = NULL; + gint state; + gchar *pad_name, *pad_stream_id; + GstPadTemplate *template; + + pad_name = g_strdup_printf ("src_%hu", stream_id); + new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name); + if (new_pad) { + g_free (pad_name); + return new_pad; + } + + g_object_get (self->sctp_association, "state", &state, NULL); + + if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) { + GST_WARNING_OBJECT (self, + "The SCTP association must be established before a new stream can be created"); + return NULL; + } + + if (stream_id > MAX_STREAM_ID) + return NULL; + + template = gst_static_pad_template_get (&src_template); + new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name, + "direction", template->direction, "template", template, NULL); + g_free (pad_name); + + gst_pad_set_event_function (new_pad, + GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event)); + + if (!gst_pad_set_active (new_pad, TRUE)) + goto error_cleanup; + + pad_stream_id = + gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu", + stream_id); + gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id)); + g_free (pad_stream_id); + gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad); + + if (!gst_element_add_pad (GST_ELEMENT (self), new_pad)) + goto error_cleanup; + + gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, + new_pad, NULL); + + gst_object_ref (new_pad); + + return new_pad; + +error_cleanup: + gst_object_unref (new_pad); + return NULL; +} + +static void +remove_pad (GstElement * element, GstPad * pad) +{ + stop_srcpad_task (pad); + gst_pad_set_active (pad, FALSE); + gst_element_remove_pad (element, pad); +} + +static void +on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association, + guint16 stream_id, GstSctpDec * self) +{ + gchar *pad_name; + GstPad *srcpad; + + pad_name = g_strdup_printf ("src_%hu", stream_id); + srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name); + g_free (pad_name); + if (!srcpad) { + GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad"); + return; + } + remove_pad (GST_ELEMENT (self), srcpad); + gst_object_unref (srcpad); +} + +static void +data_queue_item_free (GstDataQueueItem * item) +{ + if (item->object) + gst_mini_object_unref (item->object); + g_free (item); +} + +static void +on_receive (GstSctpAssociation * sctp_association, guint8 * buf, gsize length, + guint16 stream_id, guint ppid, gpointer user_data) +{ + GstSctpDec *self = user_data; + GstSctpDecPad *sctpdec_pad; + GstPad *src_pad; + GstDataQueueItem *item; + GstBuffer *gstbuf; + + src_pad = get_pad_for_stream_id (self, stream_id); + g_assert (src_pad); + + sctpdec_pad = GST_SCTP_DEC_PAD (src_pad); + gstbuf = gst_buffer_new_wrapped (buf, length); + gst_sctp_buffer_add_receive_meta (gstbuf, ppid); + + item = g_new0 (GstDataQueueItem, 1); + item->object = GST_MINI_OBJECT (gstbuf); + item->size = length; + item->visible = TRUE; + item->destroy = (GDestroyNotify) data_queue_item_free; + if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) { + item->destroy (item); + GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing"); + } + + gst_object_unref (src_pad); +} + +static void +stop_srcpad_task (GstPad * pad) +{ + GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad); + + gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE); + gst_data_queue_flush (sctpdec_pad->packet_queue); + gst_pad_stop_task (pad); +} + +static void +remove_pad_it (const GValue * item, gpointer user_data) +{ + GstPad *pad = g_value_get_object (item); + GstSctpDec *self = user_data; + + remove_pad (GST_ELEMENT (self), pad); +} + +static void +stop_all_srcpad_tasks (GstSctpDec * self) +{ + GstIterator *it; + + it = gst_element_iterate_src_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); +} + +static void +sctpdec_cleanup (GstSctpDec * self) +{ + if (self->sctp_association) { + gst_sctp_association_set_on_packet_received (self->sctp_association, NULL, + NULL); + g_signal_handler_disconnect (self->sctp_association, + self->signal_handler_stream_reset); + stop_all_srcpad_tasks (self); + gst_sctp_association_force_close (self->sctp_association); + g_object_unref (self->sctp_association); + self->sctp_association = NULL; + } +} + +static void +on_reset_stream (GstSctpDec * self, guint stream_id) +{ + if (self->sctp_association) { + gst_sctp_association_reset_stream (self->sctp_association, stream_id); + on_gst_sctp_association_stream_reset (self->sctp_association, stream_id, + self); + } +} diff --git a/ext/sctp/gstsctpdec.h b/ext/sctp/gstsctpdec.h new file mode 100644 index 000000000..845fac4d4 --- /dev/null +++ b/ext/sctp/gstsctpdec.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef __GST_SCTP_DEC_H__ +#define __GST_SCTP_DEC_H__ + +#include <gst/gst.h> + +#include "sctpassociation.h" + +G_BEGIN_DECLS + +#define GST_TYPE_SCTP_DEC (gst_sctp_dec_get_type()) +#define GST_SCTP_DEC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC, GstSctpDec)) +#define GST_SCTP_DEC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC, GstSctpDecClass)) +#define GST_IS_SCTP_DEC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC)) +#define GST_IS_SCTP_DEC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC)) +typedef struct _GstSctpDec GstSctpDec; +typedef struct _GstSctpDecClass GstSctpDecClass; + +struct _GstSctpDec +{ + GstElement element; + + GstPad *sink_pad; + guint sctp_association_id; + guint local_sctp_port; + + GstSctpAssociation *sctp_association; + gulong signal_handler_stream_reset; +}; + +struct _GstSctpDecClass +{ + GstElementClass parent_class; + + void (*on_reset_stream) (GstSctpDec * sctp_dec, guint stream_id); +}; + +GType gst_sctp_dec_get_type (void); + +G_END_DECLS + +#endif /* __GST_SCTP_DEC_H__ */ diff --git a/ext/sctp/gstsctpenc.c b/ext/sctp/gstsctpenc.c new file mode 100644 index 000000000..80a338f33 --- /dev/null +++ b/ext/sctp/gstsctpenc.c @@ -0,0 +1,937 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "gstsctpenc.h" + +#include <gst/sctp/sctpsendmeta.h> +#include <stdio.h> + +GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category); +#define GST_CAT_DEFAULT gst_sctp_enc_debug_category + +#define gst_sctp_enc_parent_class parent_class +G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT); + +static GstStaticPadTemplate sink_template = +GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, + GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate src_template = +GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, + GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp")); + +enum +{ + SIGNAL_SCTP_ASSOCIATION_ESTABLISHED, + SIGNAL_GET_STREAM_BYTES_SENT, + NUM_SIGNALS +}; + +static guint signals[NUM_SIGNALS]; + +enum +{ + PROP_0, + + PROP_GST_SCTP_ASSOCIATION_ID, + PROP_REMOTE_SCTP_PORT, + PROP_USE_SOCK_STREAM, + + NUM_PROPERTIES +}; + +static GParamSpec *properties[NUM_PROPERTIES]; + +#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1 +#define DEFAULT_REMOTE_SCTP_PORT 0 +#define DEFAULT_GST_SCTP_ORDERED TRUE +#define DEFAULT_SCTP_PPID 1 +#define DEFAULT_USE_SOCK_STREAM FALSE + +#define BUFFER_FULL_SLEEP_TIME 100000 + +GType gst_sctp_enc_pad_get_type (void); + +#define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type()) +#define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad)) +#define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass)) +#define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD)) +#define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD)) + +typedef struct _GstSctpEncPad GstSctpEncPad; +typedef GstPadClass GstSctpEncPadClass; + +struct _GstSctpEncPad +{ + GstPad parent; + + guint16 stream_id; + gboolean ordered; + guint32 ppid; + GstSctpAssociationPartialReliability reliability; + guint32 reliability_param; + + guint64 bytes_sent; + + GMutex lock; + GCond cond; + gboolean flushing; +}; + +G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD); + +static void +gst_sctp_enc_pad_finalize (GObject * object) +{ + GstSctpEncPad *self = GST_SCTP_ENC_PAD (object); + + g_cond_clear (&self->cond); + g_mutex_clear (&self->lock); + + G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object); +} + +static void +gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = gst_sctp_enc_pad_finalize; +} + +static void +gst_sctp_enc_pad_init (GstSctpEncPad * self) +{ + g_mutex_init (&self->lock); + g_cond_init (&self->cond); + self->flushing = FALSE; +} + +static void gst_sctp_enc_finalize (GObject * object); +static void gst_sctp_enc_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_sctp_enc_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element, + GstStateChange transition); +static GstPad *gst_sctp_enc_request_new_pad (GstElement * element, + GstPadTemplate * template, const gchar * name, const GstCaps * caps); +static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad); +static void gst_sctp_enc_srcpad_loop (GstPad * pad); +static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); +static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static void on_sctp_association_state_changed (GstSctpAssociation * + sctp_association, GParamSpec * pspec, GstSctpEnc * self); + +static gboolean configure_association (GstSctpEnc * self); +static void on_sctp_packet_out (GstSctpAssociation * sctp_association, + const guint8 * buf, gsize length, gpointer user_data); +static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self); +static void sctpenc_cleanup (GstSctpEnc * self); +static void get_config_from_caps (const GstCaps * caps, gboolean * ordered, + GstSctpAssociationPartialReliability * reliability, + guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available); +static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id); + +static void +gst_sctp_enc_class_init (GstSctpEncClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *element_class; + + gobject_class = (GObjectClass *) klass; + element_class = (GstElementClass *) klass; + + GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category, + "sctpenc", 0, "debug category for sctpenc element"); + + gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass), + gst_static_pad_template_get (&src_template)); + gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass), + gst_static_pad_template_get (&sink_template)); + + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property); + + element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state); + element_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad); + element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad); + + properties[PROP_GST_SCTP_ASSOCIATION_ID] = + g_param_spec_uint ("sctp-association-id", + "SCTP Association ID", + "Every encoder/decoder pair should have the same, unique, sctp-association-id. " + "This value must be set before any pads are requested.", + 0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_REMOTE_SCTP_PORT] = + g_param_spec_uint ("remote-sctp-port", + "Remote SCTP port", + "Sctp remote sctp port for the sctp association. The local port is configured via the " + "GstSctpDec element.", + 0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_USE_SOCK_STREAM] = + g_param_spec_boolean ("use-sock-stream", + "Use sock-stream", + "When set to TRUE, a sequenced, reliable, connection-based connection is used." + "When TRUE the partial reliability parameters of the channel are ignored.", + DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties); + + signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] = + g_signal_new ("sctp-association-established", + G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN); + + signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent", + G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_UINT64, 1, G_TYPE_UINT); + + klass->on_get_stream_bytes_sent = + GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent); + + gst_element_class_set_static_metadata (element_class, + "SCTP Encoder", + "Encoder/Network/SCTP", + "Encodes packets with SCTP", + "George Kiagiadakis <george.kiagiadakis@collabora.com>"); +} + +static gboolean +data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes, + guint64 time, gpointer user_data) +{ + /* TODO: When are we considered full? */ + return FALSE; +} + +static void +data_queue_empty_cb (GstDataQueue * queue, gpointer user_data) +{ +} + +static void +data_queue_full_cb (GstDataQueue * queue, gpointer user_data) +{ +} + +static void +gst_sctp_enc_init (GstSctpEnc * self) +{ + self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID; + self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT; + + self->sctp_association = NULL; + self->outbound_sctp_packet_queue = + gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb, + data_queue_empty_cb, NULL); + + self->src_pad = gst_pad_new_from_static_template (&src_template, "src"); + gst_pad_set_event_function (self->src_pad, + GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event)); + gst_element_add_pad (GST_ELEMENT (self), self->src_pad); + + g_queue_init (&self->pending_pads); +} + +static void +gst_sctp_enc_finalize (GObject * object) +{ + GstSctpEnc *self = GST_SCTP_ENC (object); + + g_queue_clear (&self->pending_pads); + gst_object_unref (self->outbound_sctp_packet_queue); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_sctp_enc_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSctpEnc *self = GST_SCTP_ENC (object); + + switch (prop_id) { + case PROP_GST_SCTP_ASSOCIATION_ID: + self->sctp_association_id = g_value_get_uint (value); + break; + case PROP_REMOTE_SCTP_PORT: + self->remote_sctp_port = g_value_get_uint (value); + break; + case PROP_USE_SOCK_STREAM: + self->use_sock_stream = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } +} + +static void +gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstSctpEnc *self = GST_SCTP_ENC (object); + + switch (prop_id) { + case PROP_GST_SCTP_ASSOCIATION_ID: + g_value_set_uint (value, self->sctp_association_id); + break; + case PROP_REMOTE_SCTP_PORT: + g_value_set_uint (value, self->remote_sctp_port); + break; + case PROP_USE_SOCK_STREAM: + g_value_set_boolean (value, self->use_sock_stream); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_sctp_enc_change_state (GstElement * element, GstStateChange transition) +{ + GstSctpEnc *self = GST_SCTP_ENC (element); + GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE; + gboolean res = TRUE; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + + case GST_STATE_CHANGE_READY_TO_PAUSED: + self->need_segment = self->need_stream_start_caps = TRUE; + gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE); + gst_pad_start_task (self->src_pad, + (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL); + res = configure_association (self); + break; + + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + + case GST_STATE_CHANGE_PAUSED_TO_READY: + sctpenc_cleanup (self); + break; + + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + + if (res) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + return ret; +} + +static GstPad * +gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template, + const gchar * new_pad_name, const GstCaps * caps) +{ + GstSctpEnc *self = GST_SCTP_ENC (element); + GstPad *new_pad = NULL; + GstSctpEncPad *sctpenc_pad; + guint32 stream_id; + gint state; + guint32 new_ppid; + gboolean is_new_ppid; + + g_object_get (self->sctp_association, "state", &state, NULL); + + if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) { + g_warning + ("The SCTP association must be established before a new stream can be created"); + goto invalid_state; + } + + if (!template) + goto invalid_parameter; + + if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1) + || stream_id > 65534) /* 65535 is not a valid stream id */ + goto invalid_parameter; + + new_pad = gst_element_get_static_pad (element, new_pad_name); + if (new_pad) { + gst_object_unref (new_pad); + new_pad = NULL; + goto invalid_parameter; + } + + new_pad = + g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction", + template->direction, "template", template, NULL); + gst_pad_set_chain_function (new_pad, + GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain)); + gst_pad_set_event_function (new_pad, + GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event)); + + sctpenc_pad = GST_SCTP_ENC_PAD (new_pad); + sctpenc_pad->stream_id = stream_id; + sctpenc_pad->ppid = DEFAULT_SCTP_PPID; + + get_config_from_caps (caps, &sctpenc_pad->ordered, &sctpenc_pad->reliability, + &sctpenc_pad->reliability_param, &new_ppid, &is_new_ppid); + + if (is_new_ppid) + sctpenc_pad->ppid = new_ppid; + sctpenc_pad->flushing = FALSE; + + if (!gst_pad_set_active (new_pad, TRUE)) + goto error_cleanup; + + if (!gst_element_add_pad (element, new_pad)) + goto error_cleanup; + +invalid_state: +invalid_parameter: + return new_pad; +error_cleanup: + gst_object_unref (new_pad); + return NULL; +} + +static void +gst_sctp_enc_release_pad (GstElement * element, GstPad * pad) +{ + GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); + GstSctpEnc *self; + guint stream_id = 0; + + self = GST_SCTP_ENC (element); + + g_mutex_lock (&sctpenc_pad->lock); + sctpenc_pad->flushing = TRUE; + g_cond_signal (&sctpenc_pad->cond); + g_mutex_unlock (&sctpenc_pad->lock); + + stream_id = sctpenc_pad->stream_id; + gst_pad_set_active (pad, FALSE); + + if (self->sctp_association) + gst_sctp_association_reset_stream (self->sctp_association, stream_id); + + gst_element_remove_pad (element, pad); +} + +static void +gst_sctp_enc_srcpad_loop (GstPad * pad) +{ + GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad)); + GstFlowReturn flow_ret; + GstDataQueueItem *item; + + if (self->need_stream_start_caps) { + gchar s_id[32]; + GstCaps *caps; + + g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ()); + gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id)); + + caps = gst_caps_new_empty_simple ("application/x-sctp"); + gst_pad_set_caps (self->src_pad, caps); + gst_caps_unref (caps); + + self->need_stream_start_caps = FALSE; + } + + if (self->need_segment) { + GstSegment segment; + + gst_segment_init (&segment, GST_FORMAT_BYTES); + gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment)); + + self->need_segment = FALSE; + } + + if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) { + flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object)); + item->object = NULL; + + if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING + || flow_ret == GST_FLOW_NOT_LINKED)) { + GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s", + gst_flow_get_name (flow_ret)); + } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) { + GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s", + gst_flow_get_name (flow_ret)); + } + + if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) { + GST_DEBUG_OBJECT (pad, "Pausing task because of an error"); + gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE); + gst_data_queue_flush (self->outbound_sctp_packet_queue); + gst_pad_pause_task (pad); + } + + item->destroy (item); + } else { + GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing"); + gst_pad_pause_task (pad); + } +} + +static GstFlowReturn +gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstSctpEnc *self = GST_SCTP_ENC (parent); + GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); + GstMapInfo map; + guint32 ppid; + gboolean ordered; + GstSctpAssociationPartialReliability pr; + guint32 pr_param; + gpointer state = NULL; + GstMeta *meta; + const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO; + GstFlowReturn flow_ret = GST_FLOW_ERROR; + + ppid = sctpenc_pad->ppid; + ordered = sctpenc_pad->ordered; + pr = sctpenc_pad->reliability; + pr_param = sctpenc_pad->reliability_param; + + while ((meta = gst_buffer_iterate_meta (buffer, &state))) { + if (meta->info->api == meta_info->api) { + GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta; + + ppid = sctp_send_meta->ppid; + ordered = sctp_send_meta->ordered; + pr_param = sctp_send_meta->pr_param; + switch (sctp_send_meta->pr) { + case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE: + pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE; + break; + case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX: + pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX; + break; + case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF: + pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF; + break; + case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL: + pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL; + break; + } + break; + } + } + + if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) { + g_warning ("Could not map GstBuffer"); + goto error; + } + + g_mutex_lock (&sctpenc_pad->lock); + while (!sctpenc_pad->flushing) { + gboolean data_sent = FALSE; + + g_mutex_unlock (&sctpenc_pad->lock); + + data_sent = + gst_sctp_association_send_data (self->sctp_association, map.data, + map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param); + + g_mutex_lock (&sctpenc_pad->lock); + if (data_sent) { + sctpenc_pad->bytes_sent += map.size; + break; + } else if (!sctpenc_pad->flushing) { + gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME; + + /* The buffer was probably full. Retry in a while */ + GST_OBJECT_LOCK (self); + g_queue_push_tail (&self->pending_pads, sctpenc_pad); + GST_OBJECT_UNLOCK (self); + + g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time); + + GST_OBJECT_LOCK (self); + g_queue_remove (&self->pending_pads, sctpenc_pad); + GST_OBJECT_UNLOCK (self); + } + } + flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK; + g_mutex_unlock (&sctpenc_pad->lock); + + gst_buffer_unmap (buffer, &map); +error: + gst_buffer_unref (buffer); + return flow_ret; +} + +static gboolean +gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); + gboolean ret, is_new_ppid; + guint32 new_ppid; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS:{ + GstCaps *caps; + + gst_event_parse_caps (event, &caps); + get_config_from_caps (caps, &sctpenc_pad->ordered, + &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid, + &is_new_ppid); + if (is_new_ppid) + sctpenc_pad->ppid = new_ppid; + gst_event_unref (event); + ret = TRUE; + break; + } + case GST_EVENT_STREAM_START: + case GST_EVENT_SEGMENT: + /* Drop these, we create our own */ + ret = TRUE; + gst_event_unref (event); + break; + case GST_EVENT_EOS: + /* Drop this, we're never EOS until shut down */ + ret = TRUE; + gst_event_unref (event); + break; + case GST_EVENT_FLUSH_START: + g_mutex_lock (&sctpenc_pad->lock); + sctpenc_pad->flushing = TRUE; + g_cond_signal (&sctpenc_pad->cond); + g_mutex_unlock (&sctpenc_pad->lock); + + ret = gst_pad_event_default (pad, parent, event); + break; + case GST_EVENT_FLUSH_STOP: + sctpenc_pad->flushing = FALSE; + ret = gst_pad_event_default (pad, parent, event); + break; + default: + ret = gst_pad_event_default (pad, parent, event); + break; + } + return ret; +} + +static void +flush_sinkpad (const GValue * item, gpointer user_data) +{ + GstSctpEncPad *sctpenc_pad = g_value_get_object (item); + gboolean flush = GPOINTER_TO_INT (user_data); + + if (flush) { + g_mutex_lock (&sctpenc_pad->lock); + sctpenc_pad->flushing = TRUE; + g_cond_signal (&sctpenc_pad->cond); + g_mutex_unlock (&sctpenc_pad->lock); + } else { + sctpenc_pad->flushing = FALSE; + } +} + +static gboolean +gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstSctpEnc *self = GST_SCTP_ENC (parent); + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START:{ + GstIterator *it; + + gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE); + gst_data_queue_flush (self->outbound_sctp_packet_queue); + + it = gst_element_iterate_sink_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, flush_sinkpad, + GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); + + ret = gst_pad_event_default (pad, parent, event); + break; + } + case GST_EVENT_RECONFIGURE: + case GST_EVENT_FLUSH_STOP:{ + GstIterator *it; + + it = gst_element_iterate_sink_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, flush_sinkpad, + GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); + + gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE); + self->need_segment = TRUE; + gst_pad_start_task (self->src_pad, + (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL); + + ret = gst_pad_event_default (pad, parent, event); + break; + } + default: + ret = gst_pad_event_default (pad, parent, event); + break; + } + return ret; +} + +static gboolean +configure_association (GstSctpEnc * self) +{ + gint state; + + self->sctp_association = gst_sctp_association_get (self->sctp_association_id); + + g_object_get (self->sctp_association, "state", &state, NULL); + + if (state != GST_SCTP_ASSOCIATION_STATE_NEW) { + GST_WARNING_OBJECT (self, + "Could not configure SCTP association. Association already in use!"); + g_object_unref (self->sctp_association); + self->sctp_association = NULL; + goto error; + } + + self->signal_handler_state_changed = + g_signal_connect_object (self->sctp_association, "notify::state", + G_CALLBACK (on_sctp_association_state_changed), self, 0); + + g_object_bind_property (self, "remote-sctp-port", self->sctp_association, + "remote-port", G_BINDING_SYNC_CREATE); + + g_object_bind_property (self, "use-sock-stream", self->sctp_association, + "use-sock-stream", G_BINDING_SYNC_CREATE); + + gst_sctp_association_set_on_packet_out (self->sctp_association, + on_sctp_packet_out, self); + + return TRUE; +error: + return FALSE; +} + +static void +on_sctp_association_state_changed (GstSctpAssociation * sctp_association, + GParamSpec * pspec, GstSctpEnc * self) +{ + gint state; + + g_object_get (sctp_association, "state", &state, NULL); + switch (state) { + case GST_SCTP_ASSOCIATION_STATE_NEW: + break; + case GST_SCTP_ASSOCIATION_STATE_READY: + gst_sctp_association_start (sctp_association); + break; + case GST_SCTP_ASSOCIATION_STATE_CONNECTING: + break; + case GST_SCTP_ASSOCIATION_STATE_CONNECTED: + g_signal_emit_by_name (self, "sctp-association-established", TRUE); + break; + case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING: + g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0, + FALSE); + break; + case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED: + break; + case GST_SCTP_ASSOCIATION_STATE_ERROR: + break; + } +} + +static void +data_queue_item_free (GstDataQueueItem * item) +{ + if (item->object) + gst_mini_object_unref (item->object); + g_free (item); +} + +static void +on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf, + gsize length, gpointer user_data) +{ + GstSctpEnc *self = user_data; + GstBuffer *gstbuf; + GstDataQueueItem *item; + GList *pending_pads, *l; + GstSctpEncPad *sctpenc_pad; + + gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length); + + item = g_new0 (GstDataQueueItem, 1); + item->object = GST_MINI_OBJECT (gstbuf); + item->size = length; + item->visible = TRUE; + item->destroy = (GDestroyNotify) data_queue_item_free; + + if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) { + item->destroy (item); + GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing"); + } + + /* Wake up pads in the order they waited, oldest pad first */ + GST_OBJECT_LOCK (self); + pending_pads = NULL; + while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) { + pending_pads = g_list_prepend (pending_pads, sctpenc_pad); + } + GST_OBJECT_UNLOCK (self); + + for (l = pending_pads; l; l = l->next) { + sctpenc_pad = l->data; + g_mutex_lock (&sctpenc_pad->lock); + g_cond_signal (&sctpenc_pad->cond); + g_mutex_unlock (&sctpenc_pad->lock); + } + g_list_free (pending_pads); +} + +static void +stop_srcpad_task (GstPad * pad, GstSctpEnc * self) +{ + gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE); + gst_data_queue_flush (self->outbound_sctp_packet_queue); + gst_pad_stop_task (pad); +} + +static void +remove_sinkpad (const GValue * item, gpointer user_data) +{ + GstSctpEncPad *sctpenc_pad = g_value_get_object (item); + GstSctpEnc *self = user_data; + + gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad)); +} + +static void +sctpenc_cleanup (GstSctpEnc * self) +{ + GstIterator *it; + + gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); + g_signal_handler_disconnect (self->sctp_association, + self->signal_handler_state_changed); + stop_srcpad_task (self->src_pad, self); + gst_sctp_association_force_close (self->sctp_association); + g_object_unref (self->sctp_association); + self->sctp_association = NULL; + + it = gst_element_iterate_sink_pads (GST_ELEMENT (self)); + while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + gst_iterator_free (it); + g_queue_clear (&self->pending_pads); +} + +static void +get_config_from_caps (const GstCaps * caps, gboolean * ordered, + GstSctpAssociationPartialReliability * reliability, + guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available) +{ + GstStructure *s; + guint i, n; + + *ordered = TRUE; + *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE; + *reliability_param = 0; + *ppid_available = FALSE; + + n = gst_caps_get_size (caps); + for (i = 0; i < n; i++) { + s = gst_caps_get_structure (caps, i); + if (gst_structure_has_field (s, "ordered")) { + const GValue *v = gst_structure_get_value (s, "ordered"); + *ordered = g_value_get_boolean (v); + } + if (gst_structure_has_field (s, "partially-reliability")) { + const GValue *v = gst_structure_get_value (s, "partially-reliability"); + const gchar *reliability_string = g_value_get_string (v); + + if (!g_strcmp0 (reliability_string, "none")) + *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE; + else if (!g_strcmp0 (reliability_string, "ttl")) + *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL; + else if (!g_strcmp0 (reliability_string, "buf")) + *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF; + else if (!g_strcmp0 (reliability_string, "rtx")) + *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX; + } + if (gst_structure_has_field (s, "reliability-parameter")) { + const GValue *v = gst_structure_get_value (s, "reliability-parameter"); + *reliability_param = g_value_get_uint (v); + } + if (gst_structure_has_field (s, "ppid")) { + const GValue *v = gst_structure_get_value (s, "ppid"); + *ppid = g_value_get_uint (v); + *ppid_available = TRUE; + } + } +} + +static guint64 +on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id) +{ + gchar *pad_name; + GstPad *pad; + GstSctpEncPad *sctpenc_pad; + guint64 bytes_sent; + + pad_name = g_strdup_printf ("sink_%u", stream_id); + pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name); + g_free (pad_name); + + if (!pad) { + GST_DEBUG_OBJECT (self, + "Buffered amount requested on a stream that does not exist!"); + return 0; + } + + sctpenc_pad = GST_SCTP_ENC_PAD (pad); + + g_mutex_lock (&sctpenc_pad->lock); + bytes_sent = sctpenc_pad->bytes_sent; + g_mutex_unlock (&sctpenc_pad->lock); + + gst_object_unref (sctpenc_pad); + + return bytes_sent; +} diff --git a/ext/sctp/gstsctpenc.h b/ext/sctp/gstsctpenc.h new file mode 100644 index 000000000..a05bd4e8d --- /dev/null +++ b/ext/sctp/gstsctpenc.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef __GST_SCTP_ENC_H__ +#define __GST_SCTP_ENC_H__ + +#include <gst/gst.h> +#include <gst/base/base.h> +#include "sctpassociation.h" + +G_BEGIN_DECLS + +#define GST_TYPE_SCTP_ENC (gst_sctp_enc_get_type()) +#define GST_SCTP_ENC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC, GstSctpEnc)) +#define GST_SCTP_ENC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC, GstSctpEncClass)) +#define GST_IS_SCTP_ENC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC)) +#define GST_IS_SCTP_ENC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC)) +typedef struct _GstSctpEnc GstSctpEnc; +typedef struct _GstSctpEncClass GstSctpEncClass; +typedef struct _GstSctpEncPrivate GstSctpEncPrivate; + +struct _GstSctpEnc +{ + GstElement element; + + GstPad *src_pad; + gboolean need_stream_start_caps, need_segment; + guint32 sctp_association_id; + guint16 remote_sctp_port; + gboolean use_sock_stream; + + GstSctpAssociation *sctp_association; + GstDataQueue *outbound_sctp_packet_queue; + + GQueue pending_pads; + + gulong signal_handler_state_changed; +}; + +struct _GstSctpEncClass +{ + GstElementClass parent_class; + + void (*on_sctp_association_is_established) (GstSctpEnc * sctp_enc, + gboolean established); + guint64 (*on_get_stream_bytes_sent) (GstSctpEnc * sctp_enc, + guint stream_id); + +}; + +GType gst_sctp_enc_get_type (void); + +G_END_DECLS + +#endif /* __GST_SCTP_ENC_H__ */ diff --git a/ext/sctp/gstsctpplugin.c b/ext/sctp/gstsctpplugin.c new file mode 100644 index 000000000..888a94c84 --- /dev/null +++ b/ext/sctp/gstsctpplugin.c @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsctpdec.h" +#include "gstsctpenc.h" + +#include <gst/gst.h> + +static gboolean +plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "sctpenc", GST_RANK_NONE, + GST_TYPE_SCTP_ENC) + && gst_element_register (plugin, "sctpdec", GST_RANK_NONE, + GST_TYPE_SCTP_DEC); +} + + +#ifndef PACKAGE +#define PACKAGE "sctp" +#endif + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + sctp, + "SCTP encoder/decoder plugin", + plugin_init, PACKAGE_VERSION, "BSD", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/ext/sctp/sctpassociation.c b/ext/sctp/sctpassociation.c new file mode 100644 index 000000000..b7c686426 --- /dev/null +++ b/ext/sctp/sctpassociation.c @@ -0,0 +1,836 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "sctpassociation.h" + +#include <string.h> +#include <errno.h> +#include <stdlib.h> + +#define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type()) +static GType +gst_sctp_association_state_get_type (void) +{ + static const GEnumValue values[] = { + {GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"}, + {GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"}, + {GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting", + "state-connecting"}, + {GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected", + "state-connected"}, + {GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting", + "state-disconnecting"}, + {GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected", + "state-disconnected"}, + {GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"}, + {0, NULL, NULL} + }; + static volatile GType id = 0; + + if (g_once_init_enter ((gsize *) & id)) { + GType _id; + _id = g_enum_register_static ("GstSctpAssociationState", values); + g_once_init_leave ((gsize *) & id, _id); + } + + return id; +} + +G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT); + +enum +{ + SIGNAL_STREAM_RESET, + LAST_SIGNAL +}; + + +enum +{ + PROP_0, + + PROP_ASSOCIATION_ID, + PROP_LOCAL_PORT, + PROP_REMOTE_PORT, + PROP_STATE, + PROP_USE_SOCK_STREAM, + + NUM_PROPERTIES +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +static GParamSpec *properties[NUM_PROPERTIES]; + +#define DEFAULT_NUMBER_OF_SCTP_STREAMS 10 +#define DEFAULT_LOCAL_SCTP_PORT 0 +#define DEFAULT_REMOTE_SCTP_PORT 0 + +static GHashTable *associations = NULL; +G_LOCK_DEFINE_STATIC (associations_lock); +static guint32 number_of_associations = 0; + +/* Interface implementations */ +static void gst_sctp_association_finalize (GObject * object); +static void gst_sctp_association_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_sctp_association_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static struct socket *create_sctp_socket (GstSctpAssociation * + gst_sctp_association); +static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation * + gst_sctp_association, guint16 port); +static gpointer connection_thread_func (GstSctpAssociation * self); +static gboolean client_role_connect (GstSctpAssociation * self); +static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos, + guint8 set_df); +static int receive_cb (struct socket *sock, union sctp_sockstore addr, + void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, + void *ulp_info); +static void handle_notification (GstSctpAssociation * self, + const union sctp_notification *notification, size_t length); +static void handle_association_changed (GstSctpAssociation * self, + const struct sctp_assoc_change *sac); +static void handle_stream_reset_event (GstSctpAssociation * self, + const struct sctp_stream_reset_event *ssr); +static void handle_message (GstSctpAssociation * self, guint8 * data, + guint32 datalen, guint16 stream_id, guint32 ppid); + +static void maybe_set_state_to_ready (GstSctpAssociation * self); +static void gst_sctp_association_change_state (GstSctpAssociation * self, + GstSctpAssociationState new_state, gboolean notify); + +static void +gst_sctp_association_class_init (GstSctpAssociationClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = gst_sctp_association_finalize; + gobject_class->set_property = gst_sctp_association_set_property; + gobject_class->get_property = gst_sctp_association_get_property; + + signals[SIGNAL_STREAM_RESET] = + g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass), + G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass, + on_sctp_stream_reset), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, G_TYPE_UINT); + + properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id", + "The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT, + DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP", + "The local SCTP port for this association", 0, G_MAXUSHORT, + DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_REMOTE_PORT] = + g_param_spec_uint ("remote-port", "Remote SCTP", + "The remote SCTP port for this association", 0, G_MAXUSHORT, + DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state", + "The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE, + GST_SCTP_ASSOCIATION_STATE_NEW, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_USE_SOCK_STREAM] = + g_param_spec_boolean ("use-sock-stream", "Use sock-stream", + "When set to TRUE, a sequenced, reliable, connection-based connection is used." + "When TRUE the partial reliability parameters of the channel is ignored.", + FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties); +} + +static void +gst_sctp_association_init (GstSctpAssociation * self) +{ + /* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */ + if (number_of_associations == 0) { + usrsctp_init (0, sctp_packet_out, g_print); + + /* Explicit Congestion Notification */ + usrsctp_sysctl_set_sctp_ecn_enable (0); + + usrsctp_sysctl_set_sctp_nr_outgoing_streams_default + (DEFAULT_NUMBER_OF_SCTP_STREAMS); + } + number_of_associations++; + + self->local_port = DEFAULT_LOCAL_SCTP_PORT; + self->remote_port = DEFAULT_REMOTE_SCTP_PORT; + self->sctp_ass_sock = NULL; + + self->connection_thread = NULL; + g_mutex_init (&self->association_mutex); + + self->state = GST_SCTP_ASSOCIATION_STATE_NEW; + + self->use_sock_stream = FALSE; + + usrsctp_register_address ((void *) self); +} + +static void +gst_sctp_association_finalize (GObject * object) +{ + GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); + + G_LOCK (associations_lock); + + g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id)); + + usrsctp_deregister_address ((void *) self); + number_of_associations--; + if (number_of_associations == 0) { + usrsctp_finish (); + } + G_UNLOCK (associations_lock); + + g_thread_join (self->connection_thread); + + G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object); +} + +static void +gst_sctp_association_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); + + g_mutex_lock (&self->association_mutex); + if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) { + switch (prop_id) { + case PROP_LOCAL_PORT: + case PROP_REMOTE_PORT: + g_warning ("These properties cannot be set in this state"); + goto error; + } + } + + switch (prop_id) { + case PROP_ASSOCIATION_ID: + self->association_id = g_value_get_uint (value); + break; + case PROP_LOCAL_PORT: + self->local_port = g_value_get_uint (value); + break; + case PROP_REMOTE_PORT: + self->remote_port = g_value_get_uint (value); + break; + case PROP_STATE: + self->state = g_value_get_enum (value); + break; + case PROP_USE_SOCK_STREAM: + self->use_sock_stream = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } + + g_mutex_unlock (&self->association_mutex); + if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT) + maybe_set_state_to_ready (self); + + return; + +error: + g_mutex_unlock (&self->association_mutex); +} + +static void +maybe_set_state_to_ready (GstSctpAssociation * self) +{ + gboolean signal_ready_state = FALSE; + + g_mutex_lock (&self->association_mutex); + if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) && + (self->local_port != 0 && self->remote_port != 0) + && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) { + signal_ready_state = TRUE; + gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY, + FALSE); + } + g_mutex_unlock (&self->association_mutex); + + /* The reason the state is changed twice is that we do not want to change state with + * notification while the association_mutex is locked. If someone listens + * on property change and call this object a deadlock might occur.*/ + if (signal_ready_state) + gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY, + TRUE); + +} + +static void +gst_sctp_association_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); + + switch (prop_id) { + case PROP_ASSOCIATION_ID: + g_value_set_uint (value, self->association_id); + break; + case PROP_LOCAL_PORT: + g_value_set_uint (value, self->local_port); + break; + case PROP_REMOTE_PORT: + g_value_set_uint (value, self->remote_port); + break; + case PROP_STATE: + g_value_set_enum (value, self->state); + break; + case PROP_USE_SOCK_STREAM: + g_value_set_boolean (value, self->use_sock_stream); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); + break; + } +} + +/* Public functions */ + +GstSctpAssociation * +gst_sctp_association_get (guint32 association_id) +{ + GstSctpAssociation *association; + + G_LOCK (associations_lock); + if (!associations) { + associations = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL); + } + + association = + g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id)); + if (!association) { + association = + g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id", + association_id, NULL); + g_hash_table_insert (associations, GUINT_TO_POINTER (association_id), + association); + } else { + g_object_ref (association); + } + G_UNLOCK (associations_lock); + return association; +} + +gboolean +gst_sctp_association_start (GstSctpAssociation * self) +{ + gchar *thread_name; + + g_mutex_lock (&self->association_mutex); + if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) { + g_warning ("SCTP association is in wrong state and cannot be started"); + goto configure_required; + } + + if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL) + goto error; + + gst_sctp_association_change_state (self, + GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE); + g_mutex_unlock (&self->association_mutex); + + /* The reason the state is changed twice is that we do not want to change state with + * notification while the association_mutex is locked. If someone listens + * on property change and call this object a deadlock might occur.*/ + gst_sctp_association_change_state (self, + GST_SCTP_ASSOCIATION_STATE_CONNECTING, TRUE); + + thread_name = g_strdup_printf ("connection_thread_%u", self->association_id); + self->connection_thread = g_thread_new (thread_name, + (GThreadFunc) connection_thread_func, self); + g_free (thread_name); + + return TRUE; +error: + g_mutex_unlock (&self->association_mutex); + gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR, + TRUE); +configure_required: + g_mutex_unlock (&self->association_mutex); + return FALSE; +} + +void +gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, + GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data) +{ + g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); + + g_mutex_lock (&self->association_mutex); + if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { + self->packet_out_cb = packet_out_cb; + self->packet_out_user_data = user_data; + } else { + /* This is to be thread safe. The Association might try to write to the closure already */ + g_warning ("It is not possible to change packet callback in this state"); + } + g_mutex_unlock (&self->association_mutex); + + maybe_set_state_to_ready (self); +} + +void +gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, + GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data) +{ + g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); + + g_mutex_lock (&self->association_mutex); + if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { + self->packet_received_cb = packet_received_cb; + self->packet_received_user_data = user_data; + } else { + /* This is to be thread safe. The Association might try to write to the closure already */ + g_warning ("It is not possible to change receive callback in this state"); + } + g_mutex_unlock (&self->association_mutex); + + maybe_set_state_to_ready (self); +} + +void +gst_sctp_association_incoming_packet (GstSctpAssociation * self, guint8 * buf, + guint32 length) +{ + usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0); +} + +gboolean +gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf, + guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered, + GstSctpAssociationPartialReliability pr, guint32 reliability_param) +{ + struct sctp_sendv_spa spa; + gint32 bytes_sent; + gboolean result = FALSE; + struct sockaddr_conn remote_addr; + + g_mutex_lock (&self->association_mutex); + if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) + goto end; + + memset (&spa, 0, sizeof (spa)); + + spa.sendv_sndinfo.snd_ppid = g_htonl (ppid); + spa.sendv_sndinfo.snd_sid = stream_id; + spa.sendv_sndinfo.snd_flags = ordered ? 0 : SCTP_UNORDERED; + spa.sendv_sndinfo.snd_context = 0; + spa.sendv_sndinfo.snd_assoc_id = 0; + spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; + if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) { + spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; + spa.sendv_prinfo.pr_value = g_htonl (reliability_param); + if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL) + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; + else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX) + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; + else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF) + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF; + } + + remote_addr = get_sctp_socket_address (self, self->remote_port); + bytes_sent = + usrsctp_sendv (self->sctp_ass_sock, buf, length, + (struct sockaddr *) &remote_addr, 1, (void *) &spa, + (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0); + if (bytes_sent < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* Resending this buffer is taken care of by the gstsctpenc */ + goto end; + } else { + g_warning ("Error sending data on stream %u: (%u) %s", stream_id, errno, + strerror (errno)); + goto end; + } + } + + result = TRUE; +end: + g_mutex_unlock (&self->association_mutex); + return result; +} + + +void +gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id) +{ + struct sctp_reset_streams *srs; + socklen_t length; + + length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16)); + srs = (struct sctp_reset_streams *) g_malloc0 (length); + srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; + srs->srs_number_streams = 1; + srs->srs_stream_list[0] = stream_id; + + g_mutex_lock (&self->association_mutex); + usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, + srs, length); + g_mutex_unlock (&self->association_mutex); + + g_free (srs); +} + +void +gst_sctp_association_force_close (GstSctpAssociation * self) +{ + g_mutex_lock (&self->association_mutex); + if (self->sctp_ass_sock) { + usrsctp_close (self->sctp_ass_sock); + self->sctp_ass_sock = NULL; + + } + g_mutex_unlock (&self->association_mutex); +} + +static struct socket * +create_sctp_socket (GstSctpAssociation * self) +{ + struct socket *sock; + struct linger l; + struct sctp_event event; + struct sctp_assoc_value stream_reset; + int value = 1; + guint16 event_types[] = { + SCTP_ASSOC_CHANGE, + SCTP_PEER_ADDR_CHANGE, + SCTP_REMOTE_ERROR, + SCTP_SEND_FAILED, + SCTP_SHUTDOWN_EVENT, + SCTP_ADAPTATION_INDICATION, + /*SCTP_PARTIAL_DELIVERY_EVENT, */ + /*SCTP_AUTHENTICATION_EVENT, */ + SCTP_STREAM_RESET_EVENT, + /*SCTP_SENDER_DRY_EVENT, */ + /*SCTP_NOTIFICATIONS_STOPPED_EVENT, */ + /*SCTP_ASSOC_RESET_EVENT, */ + SCTP_STREAM_CHANGE_EVENT + }; + guint32 i; + guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET; + + if ((sock = + usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0, + (void *) self)) == NULL) + goto error; + + if (usrsctp_set_non_blocking (sock, 1) < 0) { + g_warning ("Could not set non-blocking mode on SCTP socket"); + goto error; + } + + memset (&l, 0, sizeof (l)); + l.l_onoff = 1; + l.l_linger = 0; + if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l, + (socklen_t) sizeof (struct linger)) < 0) { + g_warning ("Could not set SO_LINGER on SCTP socket"); + goto error; + } + + if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value, + sizeof (int))) { + g_warning ("Could not set SCTP_NODELAY"); + goto error; + } + + memset (&stream_reset, 0, sizeof (stream_reset)); + stream_reset.assoc_id = SCTP_ALL_ASSOC; + stream_reset.assoc_value = 1; + if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, + &stream_reset, sizeof (stream_reset))) { + g_warning ("Could not set SCTP_ENABLE_STREAM_RESET"); + goto error; + } + + memset (&event, 0, sizeof (event)); + event.se_assoc_id = SCTP_ALL_ASSOC; + event.se_on = 1; + for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) { + event.se_type = event_types[i]; + if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT, + &event, sizeof (event)) < 0) { + g_warning ("Failed to register event %u", event_types[i]); + } + } + + return sock; +error: + if (sock) { + usrsctp_close (sock); + g_warning ("Could not create socket. Error: (%u) %s", errno, + strerror (errno)); + errno = 0; + sock = NULL; + } + return NULL; +} + +static struct sockaddr_conn +get_sctp_socket_address (GstSctpAssociation * gst_sctp_association, + guint16 port) +{ + struct sockaddr_conn addr; + + memset ((void *) &addr, 0, sizeof (struct sockaddr_conn)); +#ifdef __APPLE__ + addr.sconn_len = sizeof (struct sockaddr_conn); +#endif + addr.sconn_family = AF_CONN; + addr.sconn_port = g_htons (port); + addr.sconn_addr = (void *) gst_sctp_association; + + return addr; +} + +static gpointer +connection_thread_func (GstSctpAssociation * self) +{ + /* TODO: Support both server and client role */ + client_role_connect (self); + return NULL; +} + +static gboolean +client_role_connect (GstSctpAssociation * self) +{ + struct sockaddr_conn addr; + gint ret; + + g_mutex_lock (&self->association_mutex); + addr = get_sctp_socket_address (self, self->local_port); + ret = + usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr, + sizeof (struct sockaddr_conn)); + if (ret < 0) { + g_warning ("usrsctp_bind() error: (%u) %s", errno, strerror (errno)); + goto error; + } + + addr = get_sctp_socket_address (self, self->remote_port); + ret = + usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &addr, + sizeof (struct sockaddr_conn)); + if (ret < 0 && errno != EINPROGRESS) { + g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno)); + goto error; + } + g_mutex_unlock (&self->association_mutex); + return TRUE; +error: + g_mutex_unlock (&self->association_mutex); + return FALSE; +} + +static int +sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos, + guint8 set_df) +{ + GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr); + + if (self->packet_out_cb) { + self->packet_out_cb (self, buffer, length, self->packet_out_user_data); + } + + return 0; +} + +static int +receive_cb (struct socket *sock, union sctp_sockstore addr, void *data, + size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info) +{ + GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info); + + if (!data) { + /* Not sure if this can happend. */ + g_warning ("Received empty data buffer"); + } else { + if (flags & MSG_NOTIFICATION) { + handle_notification (self, (const union sctp_notification *) data, + datalen); + free (data); + } else { + handle_message (self, data, datalen, rcv_info.rcv_sid, + ntohl (rcv_info.rcv_ppid)); + } + } + + return 1; +} + +static void +handle_notification (GstSctpAssociation * self, + const union sctp_notification *notification, size_t length) +{ + g_assert (notification->sn_header.sn_length == length); + + switch (notification->sn_header.sn_type) { + case SCTP_ASSOC_CHANGE: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_CHANGE"); + handle_association_changed (self, ¬ification->sn_assoc_change); + break; + case SCTP_PEER_ADDR_CHANGE: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_PEER_ADDR_CHANGE"); + break; + case SCTP_REMOTE_ERROR: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_REMOTE_ERROR"); + break; + case SCTP_SEND_FAILED: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED"); + break; + case SCTP_SHUTDOWN_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SHUTDOWN_EVENT"); + break; + case SCTP_ADAPTATION_INDICATION: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, + "Event: SCTP_ADAPTATION_INDICATION"); + break; + case SCTP_PARTIAL_DELIVERY_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, + "Event: SCTP_PARTIAL_DELIVERY_EVENT"); + break; + case SCTP_AUTHENTICATION_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, + "Event: SCTP_AUTHENTICATION_EVENT"); + break; + case SCTP_STREAM_RESET_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_RESET_EVENT"); + handle_stream_reset_event (self, ¬ification->sn_strreset_event); + break; + case SCTP_SENDER_DRY_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SENDER_DRY_EVENT"); + break; + case SCTP_NOTIFICATIONS_STOPPED_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, + "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT"); + break; + case SCTP_ASSOC_RESET_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_RESET_EVENT"); + break; + case SCTP_STREAM_CHANGE_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_CHANGE_EVENT"); + break; + case SCTP_SEND_FAILED_EVENT: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED_EVENT"); + break; + default: + break; + } +} + +static void +handle_association_changed (GstSctpAssociation * self, + const struct sctp_assoc_change *sac) +{ + gboolean change_state = FALSE; + GstSctpAssociationState new_state; + + switch (sac->sac_state) { + case SCTP_COMM_UP: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()"); + g_mutex_lock (&self->association_mutex); + if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) { + change_state = TRUE; + new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED; + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP association connected!"); + } else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) { + g_warning ("SCTP association already open"); + } else { + g_warning ("SCTP association in unexpected state"); + } + g_mutex_unlock (&self->association_mutex); + break; + case SCTP_COMM_LOST: + g_warning ("SCTP event SCTP_COMM_LOST received"); + /* TODO: Tear down association and signal that this has happend */ + break; + case SCTP_RESTART: + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, + "SCTP event SCTP_RESTART received"); + break; + case SCTP_SHUTDOWN_COMP: + g_warning ("SCTP event SCTP_SHUTDOWN_COMP received"); + /* TODO: Tear down association and signal that this has happend */ + break; + case SCTP_CANT_STR_ASSOC: + g_warning ("SCTP event SCTP_CANT_STR_ASSOC received"); + break; + } + + if (change_state) + gst_sctp_association_change_state (self, new_state, TRUE); +} + +static void +handle_stream_reset_event (GstSctpAssociation * self, + const struct sctp_stream_reset_event *sr) +{ + guint32 i, n; + if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) && + !(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) { + n = (sr->strreset_length - + sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t); + for (i = 0; i < n; i++) { + if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { + g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0, + sr->strreset_stream_list[i]); + } + } + } +} + +static void +handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen, + guint16 stream_id, guint32 ppid) +{ + if (self->packet_received_cb) { + self->packet_received_cb (self, data, datalen, stream_id, ppid, + self->packet_received_user_data); + } +} + +static void +gst_sctp_association_change_state (GstSctpAssociation * self, + GstSctpAssociationState new_state, gboolean notify) +{ + self->state = new_state; + if (notify) + g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]); +} diff --git a/ext/sctp/sctpassociation.h b/ext/sctp/sctpassociation.h new file mode 100644 index 000000000..4510ba70f --- /dev/null +++ b/ext/sctp/sctpassociation.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2015, Collabora Ltd. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this + * list of conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef __GST_SCTP_ASSOCIATION_H__ +#define __GST_SCTP_ASSOCIATION_H__ + +#include <glib-object.h> +#define INET +#define INET6 +#include <usrsctp.h> + +G_BEGIN_DECLS + +#define GST_SCTP_TYPE_ASSOCIATION (gst_sctp_association_get_type ()) +#define GST_SCTP_ASSOCIATION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociation)) +#define GST_SCTP_IS_ASSOCIATION(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_SCTP_TYPE_ASSOCIATION)) +#define GST_SCTP_ASSOCIATION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociationClass)) +#define GST_SCTP_IS_ASSOCIATION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_SCTP_TYPE_ASSOCIATION)) +#define GST_SCTP_ASSOCIATION_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_SCTP_TYPE_ASSOCIATION, GstSctpAssociationClass)) + +typedef struct _GstSctpAssociation GstSctpAssociation; +typedef struct _GstSctpAssociationClass GstSctpAssociationClass; + +typedef enum +{ + GST_SCTP_ASSOCIATION_STATE_NEW, + GST_SCTP_ASSOCIATION_STATE_READY, + GST_SCTP_ASSOCIATION_STATE_CONNECTING, + GST_SCTP_ASSOCIATION_STATE_CONNECTED, + GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, + GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, + GST_SCTP_ASSOCIATION_STATE_ERROR +} GstSctpAssociationState; + +typedef enum +{ + GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE = 0x0000, + GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL = 0x0001, + GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF = 0x0002, + GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX = 0x0003 +} GstSctpAssociationPartialReliability; + +typedef void (*GstSctpAssociationPacketReceivedCb) (GstSctpAssociation * + sctp_association, guint8 * data, gsize length, guint16 stream_id, + guint ppid, gpointer user_data); +typedef void (*GstSctpAssociationPacketOutCb) (GstSctpAssociation * + sctp_association, const guint8 * data, gsize length, gpointer user_data); + +struct _GstSctpAssociation +{ + GObject parent_instance; + + guint32 association_id; + guint16 local_port; + guint16 remote_port; + gboolean use_sock_stream; + struct socket *sctp_ass_sock; + + GMutex association_mutex; + + GstSctpAssociationState state; + + GThread *connection_thread; + + GstSctpAssociationPacketReceivedCb packet_received_cb; + gpointer packet_received_user_data; + + GstSctpAssociationPacketOutCb packet_out_cb; + gpointer packet_out_user_data; +}; + +struct _GstSctpAssociationClass +{ + GObjectClass parent_class; + + void (*on_sctp_stream_reset) (GstSctpAssociation * sctp_association, + guint16 stream_id); +}; + +GType gst_sctp_association_get_type (void); + +GstSctpAssociation *gst_sctp_association_get (guint32 association_id); + +gboolean gst_sctp_association_start (GstSctpAssociation * self); +void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, + GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data); +void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, + GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data); +void gst_sctp_association_incoming_packet (GstSctpAssociation * self, + guint8 * buf, guint32 length); +gboolean gst_sctp_association_send_data (GstSctpAssociation * self, + guint8 * buf, guint32 length, guint16 stream_id, guint32 ppid, + gboolean ordered, GstSctpAssociationPartialReliability pr, + guint32 reliability_param); +void gst_sctp_association_reset_stream (GstSctpAssociation * self, + guint16 stream_id); +void gst_sctp_association_force_close (GstSctpAssociation * self); + +G_END_DECLS + +#endif /* __GST_SCTP_ASSOCIATION_H__ */ |