summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHoonHee Lee <hoonhee.lee@lge.com>2014-03-04 19:40:05 +0900
committerSebastian Dröge <sebastian@centricular.com>2015-03-12 14:42:18 +0000
commitfadabe8b787d53c90dc06e1fcdaff3f2f04e990c (patch)
tree8a041c399f00bd64c243741810d1531675184410
parent5143b835cc58a0353f0f47623b53c1c835efa618 (diff)
streamiddemux: Add streamiddemux element
Demultiplex a stream to multiple source pads based on the stream ids from the stream-start events. This basically reverses the behaviour of funnel. https://bugzilla.gnome.org/show_bug.cgi?id=707605
-rw-r--r--configure.ac1
-rw-r--r--plugins/elements/Makefile.am2
-rw-r--r--plugins/elements/gstelements.c5
-rw-r--r--plugins/elements/gststreamiddemux.c401
-rw-r--r--plugins/elements/gststreamiddemux.h66
-rw-r--r--tests/check/Makefile.am1
-rw-r--r--tests/check/elements/streamiddemux.c514
-rw-r--r--tests/examples/Makefile.am1
-rw-r--r--tests/examples/streamiddemux/Makefile.am6
-rw-r--r--tests/examples/streamiddemux/streamiddemux-stream.c241
10 files changed, 1238 insertions, 0 deletions
diff --git a/configure.ac b/configure.ac
index e637239df..9e4582662 100644
--- a/configure.ac
+++ b/configure.ac
@@ -839,6 +839,7 @@ tests/examples/memory/Makefile
tests/examples/metadata/Makefile
tests/examples/netclock/Makefile
tests/examples/queue/Makefile
+tests/examples/streamiddemux/Makefile
tests/examples/streams/Makefile
tests/examples/typefind/Makefile
tools/Makefile
diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am
index 6d3857cc7..ee3c9fb2e 100644
--- a/plugins/elements/Makefile.am
+++ b/plugins/elements/Makefile.am
@@ -24,6 +24,7 @@ libgstcoreelements_la_SOURCES = \
gstsparsefile.c \
gsttee.c \
gsttypefindelement.c \
+ gststreamiddemux.c \
gstvalve.c
libgstcoreelements_la_CFLAGS = $(GST_OBJ_CFLAGS)
@@ -54,6 +55,7 @@ noinst_HEADERS = \
gstsparsefile.h \
gsttee.h \
gsttypefindelement.h \
+ gststreamiddemux.h \
gstvalve.h
EXTRA_DIST = gstfdsrc.c \
diff --git a/plugins/elements/gstelements.c b/plugins/elements/gstelements.c
index 2b1781da7..5c0785579 100644
--- a/plugins/elements/gstelements.c
+++ b/plugins/elements/gstelements.c
@@ -46,6 +46,7 @@
#include "gsttee.h"
#include "gsttypefindelement.h"
#include "gstvalve.h"
+#include "gststreamiddemux.h"
static gboolean
plugin_init (GstPlugin * plugin)
@@ -109,6 +110,10 @@ plugin_init (GstPlugin * plugin)
gst_valve_get_type ()))
return FALSE;
+ if (!gst_element_register (plugin, "streamiddemux", GST_RANK_PRIMARY,
+ gst_streamid_demux_get_type ()))
+ return FALSE;
+
return TRUE;
}
diff --git a/plugins/elements/gststreamiddemux.c b/plugins/elements/gststreamiddemux.c
new file mode 100644
index 000000000..8528acb5e
--- /dev/null
+++ b/plugins/elements/gststreamiddemux.c
@@ -0,0 +1,401 @@
+/* GStreamer streamiddemux element
+ *
+ * Copyright 2013 LGE Corporation.
+ * @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ * @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ * @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * gststreamiddemux.c: Simple stream-id-demultiplexer element
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * SECTION:element-streamid-demux
+ * @see_also: #GstFunnel
+ *
+ * Direct input stream to one out of N output pads by stream-id.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+
+#include "gststreamiddemux.h"
+
+GST_DEBUG_CATEGORY_STATIC (streamid_demux_debug);
+#define GST_CAT_DEFAULT streamid_demux_debug
+
+enum
+{
+ PROP_0,
+ PROP_ACTIVE_PAD,
+ PROP_LAST
+};
+
+static GstStaticPadTemplate gst_streamid_demux_sink_factory =
+GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+static GstStaticPadTemplate gst_streamid_demux_src_factory =
+GST_STATIC_PAD_TEMPLATE ("src_%u",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS_ANY);
+
+#define _do_init \
+GST_DEBUG_CATEGORY_INIT (streamid_demux_debug, \
+ "streamiddemux", 0, "Streamid demuxer");
+#define gst_streamid_demux_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstStreamidDemux, gst_streamid_demux,
+ GST_TYPE_ELEMENT, _do_init);
+
+static void gst_streamid_demux_dispose (GObject * object);
+static void gst_streamid_demux_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static GstFlowReturn gst_streamid_demux_chain (GstPad * pad,
+ GstObject * parent, GstBuffer * buf);
+static gboolean gst_streamid_demux_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static GstStateChangeReturn gst_streamid_demux_change_state (GstElement *
+ element, GstStateChange transition);
+static GstPad *gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux *
+ demux, const gchar * stream_id);
+static gboolean gst_streamid_demux_srcpad_create (GstStreamidDemux * demux,
+ GstPad * pad, const gchar * stream_id);
+static void gst_streamid_demux_reset (GstStreamidDemux * demux);
+static void gst_streamid_demux_release_srcpad (const GValue * item,
+ GstStreamidDemux * demux);
+
+static void
+gst_streamid_demux_class_init (GstStreamidDemuxClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+
+ gobject_class->get_property = gst_streamid_demux_get_property;
+ gobject_class->dispose = gst_streamid_demux_dispose;
+
+ g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
+ g_param_spec_object ("active-pad", "Active pad",
+ "The currently active src pad", GST_TYPE_PAD,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_set_static_metadata (gstelement_class, "Streamid Demux",
+ "Generic", "1-to-N output stream by stream-id",
+ "HoonHee Lee <hoonhee.lee@lge.com>");
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&gst_streamid_demux_sink_factory));
+
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&gst_streamid_demux_src_factory));
+
+ gstelement_class->change_state = gst_streamid_demux_change_state;
+}
+
+static void
+gst_streamid_demux_init (GstStreamidDemux * demux)
+{
+ demux->sinkpad =
+ gst_pad_new_from_static_template (&gst_streamid_demux_sink_factory,
+ "sink");
+ gst_pad_set_chain_function (demux->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_streamid_demux_chain));
+ gst_pad_set_event_function (demux->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_streamid_demux_event));
+
+ gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
+
+ /* srcpad management */
+ demux->active_srcpad = NULL;
+ demux->nb_srcpads = 0;
+
+ /* initialize hash table for srcpad */
+ demux->stream_id_pairs =
+ g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify) g_free,
+ (GDestroyNotify) gst_object_unref);
+}
+
+static void
+gst_streamid_demux_dispose (GObject * object)
+{
+ GstStreamidDemux *demux = GST_STREAMID_DEMUX (object);
+
+ gst_streamid_demux_reset (demux);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
+gst_streamid_demux_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstStreamidDemux *demux = GST_STREAMID_DEMUX (object);
+
+ switch (prop_id) {
+ case PROP_ACTIVE_PAD:
+ GST_OBJECT_LOCK (demux);
+ g_value_set_object (value, demux->active_srcpad);
+ GST_OBJECT_UNLOCK (demux);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+ GstPad *srcpad = GST_PAD_CAST (user_data);
+
+ gst_pad_push_event (srcpad, gst_event_ref (*event));
+
+ return TRUE;
+}
+
+static gboolean
+gst_streamid_demux_srcpad_create (GstStreamidDemux * demux, GstPad * pad,
+ const gchar * stream_id)
+{
+ gchar *padname = NULL;
+ GstPad *srcpad = NULL;
+ GstPadTemplate *pad_tmpl = NULL;
+
+ padname = g_strdup_printf ("src_%u", demux->nb_srcpads++);
+ pad_tmpl = gst_static_pad_template_get (&gst_streamid_demux_src_factory);
+
+ GST_LOG_OBJECT (demux, "generating a srcpad:%s", padname);
+ srcpad = gst_pad_new_from_template (pad_tmpl, padname);
+ gst_object_unref (pad_tmpl);
+ g_free (padname);
+ g_return_val_if_fail (srcpad != NULL, FALSE);
+
+ demux->active_srcpad = srcpad;
+ g_hash_table_insert (demux->stream_id_pairs, g_strdup (stream_id),
+ gst_object_ref (srcpad));
+
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_streamid_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+ GstFlowReturn res = GST_FLOW_OK;
+ GstStreamidDemux *demux = NULL;
+ GstPad *srcpad = NULL;
+
+ demux = GST_STREAMID_DEMUX (parent);
+
+ GST_LOG_OBJECT (demux, "pushing buffer to %" GST_PTR_FORMAT,
+ demux->active_srcpad);
+
+ GST_OBJECT_LOCK (demux);
+ if (demux->active_srcpad) {
+ srcpad = gst_object_ref (demux->active_srcpad);
+ GST_OBJECT_UNLOCK (demux);
+ res = gst_pad_push (srcpad, buf);
+ gst_object_unref (srcpad);
+ } else {
+ GST_OBJECT_UNLOCK (demux);
+ goto no_active_srcpad;
+ }
+
+ GST_LOG_OBJECT (demux, "handled buffer %s", gst_flow_get_name (res));
+ return res;
+
+/* ERROR */
+no_active_srcpad:
+ {
+ GST_WARNING_OBJECT (demux, "srcpad is not initialized");
+ return GST_FLOW_NOT_NEGOTIATED;
+ }
+}
+
+static GstPad *
+gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux * demux,
+ const gchar * stream_id)
+{
+ GstPad *srcpad = NULL;
+
+ GST_DEBUG_OBJECT (demux, "stream_id = %s", stream_id);
+ if (demux->stream_id_pairs == NULL || stream_id == NULL) {
+ goto done;
+ }
+
+ srcpad = g_hash_table_lookup (demux->stream_id_pairs, stream_id);
+
+ if (srcpad) {
+ GST_DEBUG_OBJECT (demux, "srcpad = %s:%s matched",
+ GST_DEBUG_PAD_NAME (srcpad));
+ }
+
+done:
+ return srcpad;
+}
+
+static gboolean
+gst_streamid_demux_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ gboolean res = TRUE;
+ GstStreamidDemux *demux;
+ const gchar *stream_id = NULL;
+ GstPad *active_srcpad = NULL;
+
+ demux = GST_STREAMID_DEMUX (parent);
+
+ GST_DEBUG_OBJECT (demux, "event = %s, sticky = %d",
+ GST_EVENT_TYPE_NAME (event), GST_EVENT_IS_STICKY (event));
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
+ gst_event_parse_stream_start (event, &stream_id);
+ if (!stream_id)
+ goto no_stream_id;
+
+ GST_OBJECT_LOCK (demux);
+ active_srcpad =
+ gst_streamid_demux_get_srcpad_by_stream_id (demux, stream_id);
+ if (!active_srcpad) {
+ /* try to generate a srcpad */
+ if (gst_streamid_demux_srcpad_create (demux, pad, stream_id)) {
+ GST_OBJECT_UNLOCK (demux);
+
+ gst_pad_set_active (demux->active_srcpad, TRUE);
+ /* Forward sticky events to the new srcpad */
+ gst_pad_sticky_events_foreach (demux->sinkpad, forward_sticky_events,
+ demux->active_srcpad);
+ gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->active_srcpad);
+ } else {
+ GST_OBJECT_UNLOCK (demux);
+ goto fail_create_srcpad;
+ }
+ } else if (demux->active_srcpad != active_srcpad) {
+ demux->active_srcpad = active_srcpad;
+ GST_OBJECT_UNLOCK (demux);
+
+ g_object_notify (G_OBJECT (demux), "active-pad");
+ } else
+ GST_OBJECT_UNLOCK (demux);
+ }
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START
+ || GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP
+ || GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ res = gst_pad_event_default (pad, parent, event);
+ } else if (demux->active_srcpad) {
+ GstPad *srcpad = NULL;
+ GST_OBJECT_LOCK (demux);
+ srcpad = gst_object_ref (demux->active_srcpad);
+ GST_OBJECT_UNLOCK (demux);
+ res = gst_pad_push_event (srcpad, event);
+ gst_object_unref (srcpad);
+ } else {
+ gst_event_unref (event);
+ }
+ return res;
+
+ /* ERRORS */
+no_stream_id:
+ {
+ GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
+ ("Error occurred trying to get stream-id to create a srcpad"),
+ ("no stream-id found at %s", GST_EVENT_TYPE_NAME (event)));
+
+ gst_event_unref (event);
+ return FALSE;
+ }
+
+fail_create_srcpad:
+ {
+ GST_ELEMENT_ERROR (demux, STREAM, FAILED,
+ ("Error occurred trying to create a srcpad"),
+ ("Failed to create a srcpad via stream-id:%s", stream_id));
+ gst_event_unref (event);
+ return FALSE;
+ }
+}
+
+static void
+gst_streamid_demux_release_srcpad (const GValue * item,
+ GstStreamidDemux * demux)
+{
+ GstPad *pad = g_value_get_object (item);
+
+ if (pad != NULL) {
+ gst_pad_set_active (pad, FALSE);
+ gst_element_remove_pad (GST_ELEMENT_CAST (demux), pad);
+ }
+}
+
+static void
+gst_streamid_demux_reset (GstStreamidDemux * demux)
+{
+ GstIterator *it = NULL;
+ GstIteratorResult itret = GST_ITERATOR_OK;
+
+ GST_OBJECT_LOCK (demux);
+ if (demux->active_srcpad != NULL)
+ demux->active_srcpad = NULL;
+
+ GST_OBJECT_UNLOCK (demux);
+
+ if (demux->stream_id_pairs != NULL) {
+ g_hash_table_unref (demux->stream_id_pairs);
+ demux->stream_id_pairs = NULL;
+ }
+
+ it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
+ while (itret == GST_ITERATOR_OK || itret == GST_ITERATOR_RESYNC) {
+ itret =
+ gst_iterator_foreach (it,
+ (GstIteratorForeachFunction) gst_streamid_demux_release_srcpad, demux);
+ if (itret == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (it);
+ }
+ gst_iterator_free (it);
+}
+
+static GstStateChangeReturn
+gst_streamid_demux_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstStreamidDemux *demux;
+ GstStateChangeReturn result;
+
+ demux = GST_STREAMID_DEMUX (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ break;
+ default:
+ break;
+ }
+
+ result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_streamid_demux_reset (demux);
+ break;
+ default:
+ break;
+ }
+
+ return result;
+}
diff --git a/plugins/elements/gststreamiddemux.h b/plugins/elements/gststreamiddemux.h
new file mode 100644
index 000000000..ec5383b2c
--- /dev/null
+++ b/plugins/elements/gststreamiddemux.h
@@ -0,0 +1,66 @@
+/*
+ * GStreamer streamiddemux eleement
+ *
+ * Copyright 2013 LGE Corporation.
+ * @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ * @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ * @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * gststreamiddemux.h: Simple stream-id-demultiplexer element
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef __GST_STREAMID_DEMUX_H__
+#define __GST_STREAMID_DEMUX_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+#define GST_TYPE_STREAMID_DEMUX \
+ (gst_streamid_demux_get_type())
+#define GST_STREAMID_DEMUX(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMID_DEMUX, GstStreamidDemux))
+#define GST_STREAMID_DEMUX_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMID_DEMUX, GstStreamidDemuxClass))
+#define GST_IS_STREAMID_DEMUX(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMID_DEMUX))
+#define GST_IS_STREAMID_DEMUX_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMID_DEMUX))
+typedef struct _GstStreamidDemux GstStreamidDemux;
+typedef struct _GstStreamidDemuxClass GstStreamidDemuxClass;
+
+struct _GstStreamidDemux
+{
+ GstElement element;
+
+ GstPad *sinkpad;
+
+ guint nb_srcpads;
+ GstPad *active_srcpad;
+
+ /* This table contains srcpad and stream-id */
+ GHashTable *stream_id_pairs;
+};
+
+struct _GstStreamidDemuxClass
+{
+ GstElementClass parent_class;
+};
+
+G_GNUC_INTERNAL GType gst_streamid_demux_get_type (void);
+
+G_END_DECLS
+#endif /* __GST_STREAMID_DEMUX_H__ */
diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am
index 36d0ebc03..5e7e5ab8b 100644
--- a/tests/check/Makefile.am
+++ b/tests/check/Makefile.am
@@ -95,6 +95,7 @@ REGISTRY_CHECKS = \
elements/queue \
elements/queue2 \
elements/valve \
+ elements/streamiddemux \
libs/baseparse \
libs/basesrc \
libs/basesink \
diff --git a/tests/check/elements/streamiddemux.c b/tests/check/elements/streamiddemux.c
new file mode 100644
index 000000000..8c10bce72
--- /dev/null
+++ b/tests/check/elements/streamiddemux.c
@@ -0,0 +1,514 @@
+/* GStreamer unit tests for the streamiddemux
+ *
+ * Copyright 2013 LGE Corporation.
+ * @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ * @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ * @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gst/check/gstcheck.h>
+#include <stdlib.h>
+
+#define NUM_SUBSTREAMS 100
+#define NUM_BUFFER 1000
+
+static GstPad *active_srcpad;
+
+struct TestData
+{
+ GstElement *demux;
+ GstPad *mysrc, *mysink[NUM_SUBSTREAMS];
+ GstPad *demuxsink, *demuxsrc[NUM_SUBSTREAMS];
+ gint srcpad_cnt;
+ GstCaps *mycaps;
+ GstCaps *caps[NUM_SUBSTREAMS];
+ GstSegment segment[NUM_SUBSTREAMS];
+ gchar *stream_ids[NUM_SUBSTREAMS];
+};
+
+static void
+set_active_srcpad (struct TestData *td)
+{
+ if (active_srcpad)
+ gst_object_unref (active_srcpad);
+
+ g_object_get (td->demux, "active-pad", &active_srcpad, NULL);
+}
+
+static void
+release_test_objects (struct TestData *td)
+{
+ fail_unless (gst_element_set_state (td->demux, GST_STATE_NULL) ==
+ GST_STATE_CHANGE_SUCCESS);
+
+ gst_object_unref (td->demuxsink);
+
+ gst_caps_unref (td->mycaps);
+
+ if (active_srcpad)
+ gst_object_unref (active_srcpad);
+
+ gst_object_unref (td->demux);
+}
+
+static void
+src_pad_added_cb (GstElement * demux, GstPad * pad, struct TestData *td)
+{
+ if (td->srcpad_cnt < NUM_SUBSTREAMS) {
+ td->demuxsrc[td->srcpad_cnt] = pad;
+ fail_unless (gst_pad_link (pad,
+ td->mysink[td->srcpad_cnt++]) == GST_PAD_LINK_OK);
+ }
+}
+
+static void
+setup_test_objects (struct TestData *td)
+{
+ td->mycaps = gst_caps_new_empty_simple ("test/test");
+ td->srcpad_cnt = 0;
+
+ td->demux = gst_element_factory_make ("streamiddemux", NULL);
+ fail_unless (td->demux != NULL);
+ g_signal_connect (td->demux, "pad-added", G_CALLBACK (src_pad_added_cb), td);
+ td->demuxsink = gst_element_get_static_pad (td->demux, "sink");
+ fail_unless (td->demuxsink != NULL);
+
+ fail_unless (gst_element_set_state (td->demux, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_SUCCESS);
+}
+
+static GstFlowReturn
+chain_ok (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstPad *peer_pad = NULL;
+ gchar *pad_stream_id, *active_srcpad_stream_id;
+
+ peer_pad = gst_pad_get_peer (active_srcpad);
+ pad_stream_id = gst_pad_get_stream_id (pad);
+ active_srcpad_stream_id = gst_pad_get_stream_id (active_srcpad);
+ fail_unless (pad == peer_pad);
+ fail_unless (g_strcmp0 (pad_stream_id, active_srcpad_stream_id) == 0);
+
+ g_free (pad_stream_id);
+ g_free (active_srcpad_stream_id);
+ gst_object_unref (peer_pad);
+ gst_buffer_unref (buffer);
+
+ return GST_FLOW_OK;
+}
+
+GST_START_TEST (test_simple_create_destroy)
+{
+ GstElement *demux;
+
+ demux = gst_element_factory_make ("streamiddemux", NULL);
+ gst_object_unref (demux);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_with_stream_start)
+{
+ struct TestData td;
+
+ setup_test_objects (&td);
+
+ GST_DEBUG ("Creating mysink");
+ td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+ gst_pad_set_active (td.mysink[0], TRUE);
+
+ GST_DEBUG ("Creating mysrc");
+ td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+ fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+ gst_pad_set_active (td.mysrc, TRUE);
+
+ GST_DEBUG ("Pushing stream-start event");
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_stream_start ("test0")));
+
+ g_object_get (td.demux, "active-pad", &active_srcpad, NULL);
+ fail_unless (active_srcpad != NULL, "Failed to generate a srcpad");
+ fail_unless (td.srcpad_cnt == 1, "pad-added signal has not emmited");
+
+ GST_DEBUG ("Releasing mysink and mysrc");
+ gst_pad_set_active (td.mysink[0], FALSE);
+ gst_pad_set_active (td.mysrc, FALSE);
+
+ gst_object_unref (td.mysink[0]);
+ gst_object_unref (td.mysrc);
+
+ GST_DEBUG ("Releasing streamiddemux");
+ release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_without_stream_start)
+{
+ struct TestData td;
+ GstSegment segment;
+
+ setup_test_objects (&td);
+
+ GST_DEBUG ("Creating mysink");
+ td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+ gst_pad_set_active (td.mysink[0], TRUE);
+
+ GST_DEBUG ("Creating mysrc");
+ td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+ fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+ gst_pad_set_active (td.mysrc, TRUE);
+
+ GST_DEBUG ("Pushing caps and segment event without stream-start");
+ fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_caps (td.mycaps)));
+ gst_segment_init (&segment, GST_FORMAT_BYTES);
+ fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_segment (&segment)));
+
+ g_object_get (td.demux, "active-pad", &active_srcpad, NULL);
+ fail_unless (active_srcpad == NULL, "srcpad has created unexpectedly");
+ fail_unless (td.srcpad_cnt == 0, "pad-added signal is emmited unexpectedly");
+
+ GST_DEBUG ("Releasing mysink and mysrc");
+ gst_pad_set_active (td.mysink[0], FALSE);
+ gst_pad_set_active (td.mysrc, FALSE);
+
+ gst_object_unref (td.mysink[0]);
+ gst_object_unref (td.mysrc);
+
+ GST_DEBUG ("Releasing streamiddemux");
+ release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_simple)
+{
+ struct TestData td;
+
+ setup_test_objects (&td);
+
+ GST_DEBUG ("Creating mysink");
+ td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+ td.mysink[0]->chaindata = &td;
+ gst_pad_set_chain_function (td.mysink[0], chain_ok);
+ gst_pad_set_active (td.mysink[0], TRUE);
+
+ td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK);
+ td.mysink[1]->chaindata = &td;
+ gst_pad_set_chain_function (td.mysink[1], chain_ok);
+ gst_pad_set_active (td.mysink[1], TRUE);
+
+ GST_DEBUG ("Creating mysrc");
+ td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+ fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+ gst_pad_set_active (td.mysrc, TRUE);
+
+ GST_DEBUG ("Pushing stream-start, caps and segment event");
+ gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+ GST_FORMAT_BYTES, "test0");
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+ GST_FORMAT_BYTES, "test1");
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ GST_DEBUG ("Pushing buffer");
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_stream_start ("test0")));
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_stream_start ("test1")));
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ GST_DEBUG ("Releasing mysink and mysrc");
+ gst_pad_set_active (td.mysink[0], FALSE);
+ gst_pad_set_active (td.mysink[1], FALSE);
+ gst_pad_set_active (td.mysrc, FALSE);
+
+ gst_object_unref (td.mysink[0]);
+ gst_object_unref (td.mysink[1]);
+ gst_object_unref (td.mysrc);
+
+ GST_DEBUG ("Releasing streamiddemux");
+ release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GList *expected[NUM_SUBSTREAMS];
+
+static gboolean
+sink_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GList **expected = GST_PAD_ELEMENT_PRIVATE (pad);
+ GstEvent *exp;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CAPS:{
+ GstCaps *recvcaps, *expectcaps;
+
+ *expected = g_list_first (*expected);
+ exp = GST_EVENT ((*expected)->data);
+
+ gst_event_parse_caps (event, &recvcaps);
+ gst_event_parse_caps (exp, &expectcaps);
+
+ fail_unless (gst_caps_is_equal (recvcaps, expectcaps));
+ break;
+ }
+ case GST_EVENT_SEGMENT:{
+ const GstSegment *recvseg, *expectseg;
+
+ *expected = g_list_last (*expected);
+ exp = GST_EVENT ((*expected)->data);
+
+ gst_event_parse_segment (event, &recvseg);
+ gst_event_parse_segment (exp, &expectseg);
+
+ fail_unless_equals_uint64 (recvseg->position, expectseg->position);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return gst_pad_event_default (pad, parent, event);
+}
+
+GST_START_TEST (test_streamiddemux_num_buffers)
+{
+ struct TestData td;
+ gint buffer_cnt = 0;
+ gint stream_cnt = 0;
+ GstEvent *event;
+
+ setup_test_objects (&td);
+
+ GST_DEBUG ("Creating mysink");
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gchar *name;
+ name = g_strdup_printf ("mysink%d", stream_cnt);
+ td.mysink[stream_cnt] = gst_pad_new (name, GST_PAD_SINK);
+ g_free (name);
+ gst_pad_set_chain_function (td.mysink[stream_cnt], chain_ok);
+ gst_pad_set_event_function (td.mysink[stream_cnt], sink_event_func);
+ gst_pad_set_active (td.mysink[stream_cnt], TRUE);
+ GST_PAD_ELEMENT_PRIVATE (td.mysink[stream_cnt]) = &expected[stream_cnt];
+ }
+
+ GST_DEBUG ("Creating mysrc");
+ td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+ fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+ gst_pad_set_active (td.mysrc, TRUE);
+
+ GST_DEBUG ("Creating caps");
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gchar *caps_name;
+ caps_name = g_strdup_printf ("test/test%d", stream_cnt);
+ td.caps[stream_cnt] = gst_caps_new_empty_simple (caps_name);
+
+ g_free (caps_name);
+ }
+
+ GST_DEBUG ("Creating segment");
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gst_segment_init (&td.segment[stream_cnt], GST_FORMAT_BYTES);
+ td.segment[stream_cnt].position = stream_cnt * GST_SECOND;
+ }
+
+ GST_DEBUG ("Pushing stream-start, caps and segment event");
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gchar *name;
+ name = g_strdup_printf ("test%d", stream_cnt);
+
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_stream_start (name)));
+
+ event = gst_event_new_caps (td.caps[stream_cnt]);
+ expected[stream_cnt] =
+ g_list_append (expected[stream_cnt], gst_event_ref (event));
+ fail_unless (gst_pad_push_event (td.mysrc, event));
+
+ event = gst_event_new_segment (&td.segment[stream_cnt]);
+ expected[stream_cnt] =
+ g_list_append (expected[stream_cnt], gst_event_ref (event));
+ fail_unless (gst_pad_push_event (td.mysrc, event));
+
+ g_free (name);
+ set_active_srcpad (&td);
+
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+ }
+
+ GST_DEBUG ("Pushing buffers to random srcpad");
+ for (buffer_cnt = 0; buffer_cnt < NUM_BUFFER; ++buffer_cnt) {
+ gchar *name;
+ gint active_stream = rand () % NUM_SUBSTREAMS;
+ name = g_strdup_printf ("test%d", active_stream);
+
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_stream_start (name)));
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_caps (td.caps[active_stream])));
+ fail_unless (gst_pad_push_event (td.mysrc,
+ gst_event_new_segment (&td.segment[active_stream])));
+
+ g_free (name);
+ set_active_srcpad (&td);
+
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+ }
+
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt)
+ gst_caps_unref (td.caps[stream_cnt]);
+
+ GST_DEBUG ("Releasing mysink and mysrc");
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gst_pad_set_active (td.mysink[stream_cnt], FALSE);
+ }
+ gst_pad_set_active (td.mysrc, FALSE);
+
+ for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+ gst_object_unref (td.mysink[stream_cnt]);
+ }
+ gst_object_unref (td.mysrc);
+
+ GST_DEBUG ("Releasing streamiddemux");
+ release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+guint num_eos = 0;
+guint num_flush_start = 0;
+guint num_flush_stop = 0;
+
+static gboolean
+event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ ++num_flush_start;
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ ++num_flush_stop;
+ break;
+ case GST_EVENT_EOS:
+ ++num_eos;
+ break;
+ default:
+ break;
+ }
+
+ return gst_pad_event_default (pad, parent, event);
+}
+
+GST_START_TEST (test_streamiddemux_eos)
+{
+ struct TestData td;
+
+ setup_test_objects (&td);
+
+ num_eos = 0;
+
+ GST_DEBUG ("Creating mysink");
+ td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+ gst_pad_set_chain_function (td.mysink[0], chain_ok);
+ gst_pad_set_event_function (td.mysink[0], event_func);
+ gst_pad_set_active (td.mysink[0], TRUE);
+
+ td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK);
+ gst_pad_set_chain_function (td.mysink[1], chain_ok);
+ gst_pad_set_event_function (td.mysink[1], event_func);
+ gst_pad_set_active (td.mysink[1], TRUE);
+
+ GST_DEBUG ("Creating mysrc");
+ td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+ fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+ gst_pad_set_active (td.mysrc, TRUE);
+
+ GST_DEBUG ("Pushing stream-start, caps and segment event");
+ gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+ GST_FORMAT_BYTES, "test0");
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+ GST_FORMAT_BYTES, "test1");
+ set_active_srcpad (&td);
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+ GST_DEBUG ("Pushing flush event");
+ fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_start ()));
+ fail_unless (num_flush_start == 2,
+ "Failed to send flush-start event to all pads internally linked");
+ fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_stop (TRUE)));
+ fail_unless (num_flush_stop == 2,
+ "Failed to send flush-stop event to all pads internally linked");
+
+ GST_DEBUG ("Pushing eos event");
+ fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_eos ()));
+ fail_unless (num_eos == 2,
+ "Failed to send eos event to all pads internally linked");
+
+ fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_EOS);
+
+ GST_DEBUG ("Releasing mysink and mysrc");
+ gst_pad_set_active (td.mysink[0], FALSE);
+ gst_pad_set_active (td.mysink[1], FALSE);
+ gst_pad_set_active (td.mysrc, FALSE);
+
+ gst_object_unref (td.mysink[0]);
+ gst_object_unref (td.mysink[1]);
+ gst_object_unref (td.mysrc);
+
+ GST_DEBUG ("Releasing streamiddemux");
+ release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+static Suite *
+streamiddemux_suite (void)
+{
+ Suite *s = suite_create ("streamiddemux");
+ TCase *tc_chain;
+
+ tc_chain = tcase_create ("streamiddemux simple");
+ tcase_add_test (tc_chain, test_simple_create_destroy);
+ tcase_add_test (tc_chain, test_streamiddemux_with_stream_start);
+ tcase_add_test (tc_chain, test_streamiddemux_without_stream_start);
+ tcase_add_test (tc_chain, test_streamiddemux_simple);
+ tcase_add_test (tc_chain, test_streamiddemux_num_buffers);
+ tcase_add_test (tc_chain, test_streamiddemux_eos);
+ suite_add_tcase (s, tc_chain);
+
+ return s;
+}
+
+GST_CHECK_MAIN (streamiddemux);
diff --git a/tests/examples/Makefile.am b/tests/examples/Makefile.am
index 376bac887..fce72a15c 100644
--- a/tests/examples/Makefile.am
+++ b/tests/examples/Makefile.am
@@ -20,6 +20,7 @@ always_dirs = \
netclock \
queue \
stepping \
+ streamiddemux \
streams \
typefind
diff --git a/tests/examples/streamiddemux/Makefile.am b/tests/examples/streamiddemux/Makefile.am
new file mode 100644
index 000000000..e182d2988
--- /dev/null
+++ b/tests/examples/streamiddemux/Makefile.am
@@ -0,0 +1,6 @@
+noinst_PROGRAMS = streamiddemux-stream
+
+streamiddemux_stream_SOURCES = streamiddemux-stream.c
+streamiddemux_stream_LDADD = $(GST_OBJ_LIBS)
+streamiddemux_stream_CFLAGS = $(GST_OBJ_CFLAGS)
+
diff --git a/tests/examples/streamiddemux/streamiddemux-stream.c b/tests/examples/streamiddemux/streamiddemux-stream.c
new file mode 100644
index 000000000..1ef128b2b
--- /dev/null
+++ b/tests/examples/streamiddemux/streamiddemux-stream.c
@@ -0,0 +1,241 @@
+#include <gst/gst.h>
+
+#define NUM_STREAM 13
+
+typedef struct _App App;
+
+struct _App
+{
+ GstElement *pipeline;
+ GstElement *audiotestsrc[NUM_STREAM];
+ GstElement *audioconvert[NUM_STREAM];
+ GstElement *capsfilter[NUM_STREAM];
+ GstElement *vorbisenc[NUM_STREAM];
+ GstElement *oggmux[NUM_STREAM];
+ GstElement *funnel;
+ GstElement *demux;
+ GstElement *stream_synchronizer;
+ GstElement *queue[NUM_STREAM];
+ GstElement *filesink[NUM_STREAM];
+
+ gboolean pad_blocked[NUM_STREAM];
+ GstPad *queue_srcpad[NUM_STREAM];
+ gulong blocked_id[NUM_STREAM];
+};
+
+App s_app;
+
+gint pad_added_cnt = 0;
+
+static gboolean
+bus_call (GstBus * bus, GstMessage * msg, gpointer data)
+{
+ GMainLoop *loop = (GMainLoop *) data;
+
+ switch (GST_MESSAGE_TYPE (msg)) {
+ case GST_MESSAGE_EOS:{
+ g_main_loop_quit (loop);
+ break;
+ }
+ case GST_MESSAGE_ERROR:{
+ g_main_loop_quit (loop);
+ break;
+ }
+ default:
+ break;
+ }
+ return TRUE;
+}
+
+static void
+set_blocked (App * app, gboolean blocked)
+{
+ gint i = 0;
+
+ for (i = 0; i < NUM_STREAM; i++) {
+ gst_pad_remove_probe (app->queue_srcpad[i], app->blocked_id[i]);
+ }
+}
+
+static void
+sink_do_reconfigure (App * app)
+{
+ gint i = 0;
+ GstPad *filesink_sinkpad[NUM_STREAM];
+ GstPad *sync_sinkpad[NUM_STREAM];
+ GstPad *sync_srcpad[NUM_STREAM];
+ GstIterator *it;
+ GValue item = G_VALUE_INIT;
+
+ for (i = 0; i < NUM_STREAM; i++) {
+ sync_sinkpad[i] =
+ gst_element_get_request_pad (app->stream_synchronizer, "sink_%u");
+ it = gst_pad_iterate_internal_links (sync_sinkpad[i]);
+ g_assert (it);
+ gst_iterator_next (it, &item);
+ sync_srcpad[i] = g_value_dup_object (&item);
+ g_value_unset (&item);
+
+ filesink_sinkpad[i] = gst_element_get_static_pad (app->filesink[i], "sink");
+
+ gst_pad_link_full (app->queue_srcpad[i], sync_sinkpad[i],
+ GST_PAD_LINK_CHECK_NOTHING);
+ gst_pad_link_full (sync_srcpad[i], filesink_sinkpad[i],
+ GST_PAD_LINK_CHECK_NOTHING);
+ }
+ gst_iterator_free (it);
+
+}
+
+static GstPadProbeReturn
+blocked_cb (GstPad * blockedpad, GstPadProbeInfo * info, gpointer user_data)
+{
+ App *app = user_data;
+ gint i = 0;
+ gboolean all_pads_blocked = TRUE;
+
+ for (i = 0; i < NUM_STREAM; i++) {
+ if (blockedpad == app->queue_srcpad[i])
+ app->pad_blocked[i] = TRUE;
+ }
+
+ for (i = 0; i < NUM_STREAM; i++) {
+ if (app->queue_srcpad[i] == FALSE) {
+ all_pads_blocked = FALSE;
+ break;
+ }
+ }
+
+ if (all_pads_blocked == TRUE) {
+ sink_do_reconfigure (app);
+ set_blocked (app, FALSE);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+static void
+src_pad_added_cb (GstElement * demux, GstPad * pad, App * app)
+{
+ GstPad *queue_sinkpad[NUM_STREAM];
+
+ queue_sinkpad[pad_added_cnt] =
+ gst_element_get_static_pad (app->queue[pad_added_cnt], "sink");
+ gst_pad_link_full (pad, queue_sinkpad[pad_added_cnt],
+ GST_PAD_LINK_CHECK_NOTHING);
+
+ app->queue_srcpad[pad_added_cnt] =
+ gst_element_get_static_pad (app->queue[pad_added_cnt], "src");
+ app->blocked_id[pad_added_cnt] =
+ gst_pad_add_probe (app->queue_srcpad[pad_added_cnt],
+ GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, app, NULL);
+
+ pad_added_cnt++;
+}
+
+gint
+main (gint argc, gchar * argv[])
+{
+ App *app = &s_app;
+
+ GMainLoop *loop = NULL;
+ GstBus *bus;
+ guint bus_watch_id;
+
+ GstPad *funnel_sinkpad[NUM_STREAM];
+ GstPad *funnel_srcpad;
+ GstPad *demux_sinkpad;
+ GstPad *oggmux_srcpad[NUM_STREAM];
+
+ guint stream_cnt = 0;
+ GstCaps *caps;
+
+ gst_init (&argc, &argv);
+
+ app->pipeline = gst_pipeline_new ("pipeline");
+
+ for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+ app->audiotestsrc[stream_cnt] =
+ gst_element_factory_make ("audiotestsrc", NULL);
+ app->audioconvert[stream_cnt] =
+ gst_element_factory_make ("audioconvert", NULL);
+ app->capsfilter[stream_cnt] = gst_element_factory_make ("capsfilter", NULL);
+ app->vorbisenc[stream_cnt] = gst_element_factory_make ("vorbisenc", NULL);
+ app->oggmux[stream_cnt] = gst_element_factory_make ("oggmux", NULL);
+ }
+
+ app->funnel = gst_element_factory_make ("funnel", NULL);
+ app->demux = gst_element_factory_make ("streamiddemux", NULL);
+ app->stream_synchronizer =
+ gst_element_factory_make ("streamsynchronizer", NULL);
+
+ caps = gst_caps_from_string ("audio/x-raw,channels=1;");
+
+ stream_cnt = 0;
+
+ for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+ app->queue[stream_cnt] = gst_element_factory_make ("queue", NULL);
+ app->filesink[stream_cnt] = gst_element_factory_make ("filesink", NULL);
+
+ g_object_set (app->audiotestsrc[stream_cnt], "wave", stream_cnt,
+ "num-buffers", 2000, NULL);
+ g_object_set (app->capsfilter[stream_cnt], "caps", caps, NULL);
+ g_object_set (app->filesink[stream_cnt], "location",
+ g_strdup_printf ("filesink_%d.ogg", stream_cnt), NULL);
+ }
+
+ stream_cnt = 0;
+
+ g_signal_connect (app->demux, "pad-added", G_CALLBACK (src_pad_added_cb),
+ app);
+
+ loop = g_main_loop_new (NULL, FALSE);
+
+ bus = gst_element_get_bus (app->pipeline);
+ bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
+ g_object_unref (bus);
+
+ for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+ gst_bin_add_many (GST_BIN (app->pipeline), app->audiotestsrc[stream_cnt],
+ app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
+ app->vorbisenc[stream_cnt], app->oggmux[stream_cnt],
+ app->queue[stream_cnt], app->filesink[stream_cnt], NULL);
+ if (stream_cnt == 0) {
+ gst_bin_add_many (GST_BIN (app->pipeline), app->funnel, app->demux,
+ app->stream_synchronizer, NULL);
+ }
+ }
+
+ stream_cnt = 0;
+
+ for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+ gst_element_link_many (app->audiotestsrc[stream_cnt],
+ app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
+ app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], NULL);
+ }
+
+ stream_cnt = 0;
+
+ for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+ funnel_sinkpad[stream_cnt] =
+ gst_element_get_request_pad (app->funnel, "sink_%u");
+ oggmux_srcpad[stream_cnt] =
+ gst_element_get_static_pad (app->oggmux[stream_cnt], "src");
+ gst_pad_link (oggmux_srcpad[stream_cnt], funnel_sinkpad[stream_cnt]);
+ }
+
+ funnel_srcpad = gst_element_get_static_pad (app->funnel, "src");
+
+ demux_sinkpad = gst_element_get_static_pad (app->demux, "sink");
+ gst_pad_link (funnel_srcpad, demux_sinkpad);
+
+ gst_element_set_state (app->pipeline, GST_STATE_PLAYING);
+ g_main_loop_run (loop);
+
+ gst_element_set_state (app->pipeline, GST_STATE_NULL);
+ g_object_unref (app->pipeline);
+ g_source_remove (bus_watch_id);
+ g_main_loop_unref (loop);
+
+ return 0;
+}