summaryrefslogtreecommitdiff
path: root/ext/sctp/gstsctpdec.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/sctp/gstsctpdec.c')
-rw-r--r--ext/sctp/gstsctpdec.c655
1 files changed, 655 insertions, 0 deletions
diff --git a/ext/sctp/gstsctpdec.c b/ext/sctp/gstsctpdec.c
new file mode 100644
index 000000000..ccc5ec12f
--- /dev/null
+++ b/ext/sctp/gstsctpdec.c
@@ -0,0 +1,655 @@
+/*
+ * Copyright (c) 2015, Collabora Ltd.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this
+ * list of conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstsctpdec.h"
+
+#include <gst/sctp/sctpreceivemeta.h>
+#include <gst/base/gstdataqueue.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category);
+#define GST_CAT_DEFAULT gst_sctp_dec_debug_category
+
+#define gst_sctp_dec_parent_class parent_class
+G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT);
+
+static GstStaticPadTemplate sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK,
+ GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
+
+static GstStaticPadTemplate src_template =
+GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC,
+ GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY);
+
+enum
+{
+ SIGNAL_RESET_STREAM,
+ NUM_SIGNALS
+};
+
+static guint signals[NUM_SIGNALS];
+
+enum
+{
+ PROP_0,
+
+ PROP_GST_SCTP_ASSOCIATION_ID,
+ PROP_LOCAL_SCTP_PORT,
+
+ NUM_PROPERTIES
+};
+
+static GParamSpec *properties[NUM_PROPERTIES];
+
+#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
+#define DEFAULT_LOCAL_SCTP_PORT 0
+#define MAX_SCTP_PORT 65535
+#define MAX_GST_SCTP_ASSOCIATION_ID 65535
+#define MAX_STREAM_ID 65535
+
+GType gst_sctp_dec_pad_get_type (void);
+
+#define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type())
+#define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad))
+#define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass))
+#define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD))
+#define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD))
+
+typedef struct _GstSctpDecPad GstSctpDecPad;
+typedef GstPadClass GstSctpDecPadClass;
+
+struct _GstSctpDecPad
+{
+ GstPad parent;
+
+ GstDataQueue *packet_queue;
+};
+
+G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD);
+
+static void
+gst_sctp_dec_pad_finalize (GObject * object)
+{
+ GstSctpDecPad *self = GST_SCTP_DEC_PAD (object);
+
+ gst_object_unref (self->packet_queue);
+
+ G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object);
+}
+
+static gboolean
+data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
+ guint64 time, gpointer user_data)
+{
+ /* FIXME: Are we full at some point and block? */
+ return FALSE;
+}
+
+static void
+data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
+{
+}
+
+static void
+gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->finalize = gst_sctp_dec_pad_finalize;
+}
+
+static void
+gst_sctp_dec_pad_init (GstSctpDecPad * self)
+{
+ self->packet_queue = gst_data_queue_new (data_queue_check_full_cb,
+ data_queue_full_cb, data_queue_empty_cb, NULL);
+}
+
+static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
+ GstStateChange transition);
+static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
+ GstBuffer * buf);
+static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self,
+ GstEvent * event);
+static void gst_sctp_data_srcpad_loop (GstPad * pad);
+
+static gboolean configure_association (GstSctpDec * self);
+static void on_gst_sctp_association_stream_reset (GstSctpAssociation *
+ gst_sctp_association, guint16 stream_id, GstSctpDec * self);
+static void on_receive (GstSctpAssociation * gst_sctp_association, guint8 * buf,
+ gsize length, guint16 stream_id, guint ppid, gpointer user_data);
+static void stop_srcpad_task (GstPad * pad);
+static void stop_all_srcpad_tasks (GstSctpDec * self);
+static void sctpdec_cleanup (GstSctpDec * self);
+static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
+static void remove_pad (GstElement * element, GstPad * pad);
+static void on_reset_stream (GstSctpDec * self, guint stream_id);
+
+static void
+gst_sctp_dec_class_init (GstSctpDecClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *element_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ element_class = GST_ELEMENT_CLASS (klass);
+
+ GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category,
+ "sctpdec", 0, "debug category for sctpdec element");
+
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&sink_template));
+
+ gobject_class->set_property = gst_sctp_dec_set_property;
+ gobject_class->get_property = gst_sctp_dec_get_property;
+
+ element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
+
+ klass->on_reset_stream = on_reset_stream;
+
+ properties[PROP_GST_SCTP_ASSOCIATION_ID] =
+ g_param_spec_uint ("sctp-association-id",
+ "SCTP Association ID",
+ "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
+ "This value must be set before any pads are requested.",
+ 0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ properties[PROP_LOCAL_SCTP_PORT] =
+ g_param_spec_uint ("local-sctp-port",
+ "Local SCTP port",
+ "Local sctp port for the sctp association. The remote port is configured via the "
+ "GstSctpEnc element.",
+ 0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
+
+ g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
+
+ signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream",
+ G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT);
+
+ gst_element_class_set_static_metadata (element_class,
+ "SCTP Decoder",
+ "Decoder/Network/SCTP",
+ "Decodes packets with SCTP",
+ "George Kiagiadakis <george.kiagiadakis@collabora.com>");
+}
+
+static void
+gst_sctp_dec_init (GstSctpDec * self)
+{
+ self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
+ self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
+
+ self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
+ gst_pad_set_chain_function (self->sink_pad,
+ GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
+ gst_pad_set_event_function (self->sink_pad,
+ GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event));
+
+ gst_element_add_pad (GST_ELEMENT (self), self->sink_pad);
+}
+
+static void
+gst_sctp_dec_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSctpDec *self = GST_SCTP_DEC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ self->sctp_association_id = g_value_get_uint (value);
+ break;
+ case PROP_LOCAL_SCTP_PORT:
+ self->local_sctp_port = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
+ GParamSpec * pspec)
+{
+ GstSctpDec *self = GST_SCTP_DEC (object);
+
+ switch (prop_id) {
+ case PROP_GST_SCTP_ASSOCIATION_ID:
+ g_value_set_uint (value, self->sctp_association_id);
+ break;
+ case PROP_LOCAL_SCTP_PORT:
+ g_value_set_uint (value, self->local_sctp_port);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
+{
+ GstSctpDec *self = GST_SCTP_DEC (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ if (!configure_association (self))
+ ret = GST_STATE_CHANGE_FAILURE;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ sctpdec_cleanup (self);
+ break;
+ default:
+ break;
+ }
+
+ if (ret != GST_STATE_CHANGE_FAILURE)
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ return ret;
+}
+
+static GstFlowReturn
+gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
+{
+ GstMapInfo map;
+
+ if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
+ GST_WARNING_OBJECT (self, "Could not map GstBuffer");
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+
+ gst_sctp_association_incoming_packet (self->sctp_association,
+ (guint8 *) map.data, (guint32) map.size);
+ gst_buffer_unmap (buf, &map);
+ gst_buffer_unref (buf);
+
+ return GST_FLOW_OK;
+}
+
+static void
+flush_srcpad (const GValue * item, gpointer user_data)
+{
+ GstSctpDecPad *sctpdec_pad = g_value_get_object (item);
+ gboolean flush = GPOINTER_TO_INT (user_data);
+
+ if (flush) {
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ } else {
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
+ gst_pad_start_task (GST_PAD (sctpdec_pad),
+ (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL);
+ }
+}
+
+static gboolean
+gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
+{
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_CAPS:
+ /* We create our own stream-start events and the caps event does not
+ * make sense */
+ gst_event_unref (event);
+ return TRUE;
+ case GST_EVENT_EOS:
+ /* Drop this, we're never EOS until shut down */
+ gst_event_unref (event);
+ return TRUE;
+ case GST_EVENT_FLUSH_START:{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_srcpad,
+ GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ case GST_EVENT_FLUSH_STOP:{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, flush_srcpad,
+ GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ default:
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+}
+
+static void
+gst_sctp_data_srcpad_loop (GstPad * pad)
+{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+ GstDataQueueItem *item;
+
+ if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
+ GstFlowReturn flow_ret;
+
+ flow_ret = gst_pad_push (pad, GST_BUFFER (item->object));
+ item->object = NULL;
+ if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
+ || flow_ret == GST_FLOW_NOT_LINKED)) {
+ GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
+ gst_flow_get_name (flow_ret));
+ }
+
+ if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
+ GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ gst_pad_pause_task (pad);
+ }
+
+ item->destroy (item);
+ } else {
+ GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
+ gst_pad_pause_task (pad);
+ }
+}
+
+static gboolean
+configure_association (GstSctpDec * self)
+{
+ gint state;
+
+ self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
+ GST_WARNING_OBJECT (self,
+ "Could not configure SCTP association. Association already in use!");
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+ goto error;
+ }
+
+ self->signal_handler_stream_reset =
+ g_signal_connect_object (self->sctp_association, "stream-reset",
+ G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0);
+
+ g_object_bind_property (self, "local-sctp-port", self->sctp_association,
+ "local-port", G_BINDING_SYNC_CREATE);
+
+ gst_sctp_association_set_on_packet_received (self->sctp_association,
+ on_receive, self);
+
+ return TRUE;
+error:
+ return FALSE;
+}
+
+static gboolean
+gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
+{
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_RECONFIGURE:
+ case GST_EVENT_FLUSH_STOP:{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ /* Unflush and start task again */
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
+ gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad,
+ NULL);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ case GST_EVENT_FLUSH_START:{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+ default:
+ return gst_pad_event_default (pad, GST_OBJECT (self), event);
+ }
+}
+
+static gboolean
+copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+ GstPad *new_pad = user_data;
+
+ if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS
+ && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START)
+ gst_pad_store_sticky_event (new_pad, *event);
+
+ return TRUE;
+}
+
+static GstPad *
+get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
+{
+ GstPad *new_pad = NULL;
+ gint state;
+ gchar *pad_name, *pad_stream_id;
+ GstPadTemplate *template;
+
+ pad_name = g_strdup_printf ("src_%hu", stream_id);
+ new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
+ if (new_pad) {
+ g_free (pad_name);
+ return new_pad;
+ }
+
+ g_object_get (self->sctp_association, "state", &state, NULL);
+
+ if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
+ GST_WARNING_OBJECT (self,
+ "The SCTP association must be established before a new stream can be created");
+ return NULL;
+ }
+
+ if (stream_id > MAX_STREAM_ID)
+ return NULL;
+
+ template = gst_static_pad_template_get (&src_template);
+ new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name,
+ "direction", template->direction, "template", template, NULL);
+ g_free (pad_name);
+
+ gst_pad_set_event_function (new_pad,
+ GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event));
+
+ if (!gst_pad_set_active (new_pad, TRUE))
+ goto error_cleanup;
+
+ pad_stream_id =
+ gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu",
+ stream_id);
+ gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id));
+ g_free (pad_stream_id);
+ gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad);
+
+ if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
+ goto error_cleanup;
+
+ gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
+ new_pad, NULL);
+
+ gst_object_ref (new_pad);
+
+ return new_pad;
+
+error_cleanup:
+ gst_object_unref (new_pad);
+ return NULL;
+}
+
+static void
+remove_pad (GstElement * element, GstPad * pad)
+{
+ stop_srcpad_task (pad);
+ gst_pad_set_active (pad, FALSE);
+ gst_element_remove_pad (element, pad);
+}
+
+static void
+on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
+ guint16 stream_id, GstSctpDec * self)
+{
+ gchar *pad_name;
+ GstPad *srcpad;
+
+ pad_name = g_strdup_printf ("src_%hu", stream_id);
+ srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
+ g_free (pad_name);
+ if (!srcpad) {
+ GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
+ return;
+ }
+ remove_pad (GST_ELEMENT (self), srcpad);
+ gst_object_unref (srcpad);
+}
+
+static void
+data_queue_item_free (GstDataQueueItem * item)
+{
+ if (item->object)
+ gst_mini_object_unref (item->object);
+ g_free (item);
+}
+
+static void
+on_receive (GstSctpAssociation * sctp_association, guint8 * buf, gsize length,
+ guint16 stream_id, guint ppid, gpointer user_data)
+{
+ GstSctpDec *self = user_data;
+ GstSctpDecPad *sctpdec_pad;
+ GstPad *src_pad;
+ GstDataQueueItem *item;
+ GstBuffer *gstbuf;
+
+ src_pad = get_pad_for_stream_id (self, stream_id);
+ g_assert (src_pad);
+
+ sctpdec_pad = GST_SCTP_DEC_PAD (src_pad);
+ gstbuf = gst_buffer_new_wrapped (buf, length);
+ gst_sctp_buffer_add_receive_meta (gstbuf, ppid);
+
+ item = g_new0 (GstDataQueueItem, 1);
+ item->object = GST_MINI_OBJECT (gstbuf);
+ item->size = length;
+ item->visible = TRUE;
+ item->destroy = (GDestroyNotify) data_queue_item_free;
+ if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) {
+ item->destroy (item);
+ GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing");
+ }
+
+ gst_object_unref (src_pad);
+}
+
+static void
+stop_srcpad_task (GstPad * pad)
+{
+ GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
+
+ gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
+ gst_data_queue_flush (sctpdec_pad->packet_queue);
+ gst_pad_stop_task (pad);
+}
+
+static void
+remove_pad_it (const GValue * item, gpointer user_data)
+{
+ GstPad *pad = g_value_get_object (item);
+ GstSctpDec *self = user_data;
+
+ remove_pad (GST_ELEMENT (self), pad);
+}
+
+static void
+stop_all_srcpad_tasks (GstSctpDec * self)
+{
+ GstIterator *it;
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT (self));
+ while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ gst_iterator_free (it);
+}
+
+static void
+sctpdec_cleanup (GstSctpDec * self)
+{
+ if (self->sctp_association) {
+ gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
+ NULL);
+ g_signal_handler_disconnect (self->sctp_association,
+ self->signal_handler_stream_reset);
+ stop_all_srcpad_tasks (self);
+ gst_sctp_association_force_close (self->sctp_association);
+ g_object_unref (self->sctp_association);
+ self->sctp_association = NULL;
+ }
+}
+
+static void
+on_reset_stream (GstSctpDec * self, guint stream_id)
+{
+ if (self->sctp_association) {
+ gst_sctp_association_reset_stream (self->sctp_association, stream_id);
+ on_gst_sctp_association_stream_reset (self->sctp_association, stream_id,
+ self);
+ }
+}