diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/check/Makefile.am | 1 | ||||
-rw-r--r-- | tests/check/elements/streamiddemux.c | 514 | ||||
-rw-r--r-- | tests/examples/Makefile.am | 1 | ||||
-rw-r--r-- | tests/examples/streamiddemux/Makefile.am | 6 | ||||
-rw-r--r-- | tests/examples/streamiddemux/streamiddemux-stream.c | 241 |
5 files changed, 763 insertions, 0 deletions
diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 36d0ebc03..5e7e5ab8b 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -95,6 +95,7 @@ REGISTRY_CHECKS = \ elements/queue \ elements/queue2 \ elements/valve \ + elements/streamiddemux \ libs/baseparse \ libs/basesrc \ libs/basesink \ diff --git a/tests/check/elements/streamiddemux.c b/tests/check/elements/streamiddemux.c new file mode 100644 index 000000000..8c10bce72 --- /dev/null +++ b/tests/check/elements/streamiddemux.c @@ -0,0 +1,514 @@ +/* GStreamer unit tests for the streamiddemux + * + * Copyright 2013 LGE Corporation. + * @author: Hoonhee Lee <hoonhee.lee@lge.com> + * @author: Jeongseok Kim <jeongseok.kim@lge.com> + * @author: Wonchul Lee <wonchul86.lee@lge.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <gst/check/gstcheck.h> +#include <stdlib.h> + +#define NUM_SUBSTREAMS 100 +#define NUM_BUFFER 1000 + +static GstPad *active_srcpad; + +struct TestData +{ + GstElement *demux; + GstPad *mysrc, *mysink[NUM_SUBSTREAMS]; + GstPad *demuxsink, *demuxsrc[NUM_SUBSTREAMS]; + gint srcpad_cnt; + GstCaps *mycaps; + GstCaps *caps[NUM_SUBSTREAMS]; + GstSegment segment[NUM_SUBSTREAMS]; + gchar *stream_ids[NUM_SUBSTREAMS]; +}; + +static void +set_active_srcpad (struct TestData *td) +{ + if (active_srcpad) + gst_object_unref (active_srcpad); + + g_object_get (td->demux, "active-pad", &active_srcpad, NULL); +} + +static void +release_test_objects (struct TestData *td) +{ + fail_unless (gst_element_set_state (td->demux, GST_STATE_NULL) == + GST_STATE_CHANGE_SUCCESS); + + gst_object_unref (td->demuxsink); + + gst_caps_unref (td->mycaps); + + if (active_srcpad) + gst_object_unref (active_srcpad); + + gst_object_unref (td->demux); +} + +static void +src_pad_added_cb (GstElement * demux, GstPad * pad, struct TestData *td) +{ + if (td->srcpad_cnt < NUM_SUBSTREAMS) { + td->demuxsrc[td->srcpad_cnt] = pad; + fail_unless (gst_pad_link (pad, + td->mysink[td->srcpad_cnt++]) == GST_PAD_LINK_OK); + } +} + +static void +setup_test_objects (struct TestData *td) +{ + td->mycaps = gst_caps_new_empty_simple ("test/test"); + td->srcpad_cnt = 0; + + td->demux = gst_element_factory_make ("streamiddemux", NULL); + fail_unless (td->demux != NULL); + g_signal_connect (td->demux, "pad-added", G_CALLBACK (src_pad_added_cb), td); + td->demuxsink = gst_element_get_static_pad (td->demux, "sink"); + fail_unless (td->demuxsink != NULL); + + fail_unless (gst_element_set_state (td->demux, GST_STATE_PLAYING) == + GST_STATE_CHANGE_SUCCESS); +} + +static GstFlowReturn +chain_ok (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstPad *peer_pad = NULL; + gchar *pad_stream_id, *active_srcpad_stream_id; + + peer_pad = gst_pad_get_peer (active_srcpad); + pad_stream_id = gst_pad_get_stream_id (pad); + active_srcpad_stream_id = gst_pad_get_stream_id (active_srcpad); + fail_unless (pad == peer_pad); + fail_unless (g_strcmp0 (pad_stream_id, active_srcpad_stream_id) == 0); + + g_free (pad_stream_id); + g_free (active_srcpad_stream_id); + gst_object_unref (peer_pad); + gst_buffer_unref (buffer); + + return GST_FLOW_OK; +} + +GST_START_TEST (test_simple_create_destroy) +{ + GstElement *demux; + + demux = gst_element_factory_make ("streamiddemux", NULL); + gst_object_unref (demux); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_with_stream_start) +{ + struct TestData td; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_active (td.mysink[0], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start event"); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test0"))); + + g_object_get (td.demux, "active-pad", &active_srcpad, NULL); + fail_unless (active_srcpad != NULL, "Failed to generate a srcpad"); + fail_unless (td.srcpad_cnt == 1, "pad-added signal has not emmited"); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_without_stream_start) +{ + struct TestData td; + GstSegment segment; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_active (td.mysink[0], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing caps and segment event without stream-start"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_caps (td.mycaps))); + gst_segment_init (&segment, GST_FORMAT_BYTES); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_segment (&segment))); + + g_object_get (td.demux, "active-pad", &active_srcpad, NULL); + fail_unless (active_srcpad == NULL, "srcpad has created unexpectedly"); + fail_unless (td.srcpad_cnt == 0, "pad-added signal is emmited unexpectedly"); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_simple) +{ + struct TestData td; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + td.mysink[0]->chaindata = &td; + gst_pad_set_chain_function (td.mysink[0], chain_ok); + gst_pad_set_active (td.mysink[0], TRUE); + + td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK); + td.mysink[1]->chaindata = &td; + gst_pad_set_chain_function (td.mysink[1], chain_ok); + gst_pad_set_active (td.mysink[1], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test0"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test1"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Pushing buffer"); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test0"))); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test1"))); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysink[1], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysink[1]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GList *expected[NUM_SUBSTREAMS]; + +static gboolean +sink_event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GList **expected = GST_PAD_ELEMENT_PRIVATE (pad); + GstEvent *exp; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS:{ + GstCaps *recvcaps, *expectcaps; + + *expected = g_list_first (*expected); + exp = GST_EVENT ((*expected)->data); + + gst_event_parse_caps (event, &recvcaps); + gst_event_parse_caps (exp, &expectcaps); + + fail_unless (gst_caps_is_equal (recvcaps, expectcaps)); + break; + } + case GST_EVENT_SEGMENT:{ + const GstSegment *recvseg, *expectseg; + + *expected = g_list_last (*expected); + exp = GST_EVENT ((*expected)->data); + + gst_event_parse_segment (event, &recvseg); + gst_event_parse_segment (exp, &expectseg); + + fail_unless_equals_uint64 (recvseg->position, expectseg->position); + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +GST_START_TEST (test_streamiddemux_num_buffers) +{ + struct TestData td; + gint buffer_cnt = 0; + gint stream_cnt = 0; + GstEvent *event; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *name; + name = g_strdup_printf ("mysink%d", stream_cnt); + td.mysink[stream_cnt] = gst_pad_new (name, GST_PAD_SINK); + g_free (name); + gst_pad_set_chain_function (td.mysink[stream_cnt], chain_ok); + gst_pad_set_event_function (td.mysink[stream_cnt], sink_event_func); + gst_pad_set_active (td.mysink[stream_cnt], TRUE); + GST_PAD_ELEMENT_PRIVATE (td.mysink[stream_cnt]) = &expected[stream_cnt]; + } + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Creating caps"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *caps_name; + caps_name = g_strdup_printf ("test/test%d", stream_cnt); + td.caps[stream_cnt] = gst_caps_new_empty_simple (caps_name); + + g_free (caps_name); + } + + GST_DEBUG ("Creating segment"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_segment_init (&td.segment[stream_cnt], GST_FORMAT_BYTES); + td.segment[stream_cnt].position = stream_cnt * GST_SECOND; + } + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *name; + name = g_strdup_printf ("test%d", stream_cnt); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start (name))); + + event = gst_event_new_caps (td.caps[stream_cnt]); + expected[stream_cnt] = + g_list_append (expected[stream_cnt], gst_event_ref (event)); + fail_unless (gst_pad_push_event (td.mysrc, event)); + + event = gst_event_new_segment (&td.segment[stream_cnt]); + expected[stream_cnt] = + g_list_append (expected[stream_cnt], gst_event_ref (event)); + fail_unless (gst_pad_push_event (td.mysrc, event)); + + g_free (name); + set_active_srcpad (&td); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + } + + GST_DEBUG ("Pushing buffers to random srcpad"); + for (buffer_cnt = 0; buffer_cnt < NUM_BUFFER; ++buffer_cnt) { + gchar *name; + gint active_stream = rand () % NUM_SUBSTREAMS; + name = g_strdup_printf ("test%d", active_stream); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start (name))); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_caps (td.caps[active_stream]))); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_segment (&td.segment[active_stream]))); + + g_free (name); + set_active_srcpad (&td); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + } + + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) + gst_caps_unref (td.caps[stream_cnt]); + + GST_DEBUG ("Releasing mysink and mysrc"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_pad_set_active (td.mysink[stream_cnt], FALSE); + } + gst_pad_set_active (td.mysrc, FALSE); + + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_object_unref (td.mysink[stream_cnt]); + } + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +guint num_eos = 0; +guint num_flush_start = 0; +guint num_flush_stop = 0; + +static gboolean +event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_STREAM_START: + ++num_flush_start; + break; + case GST_EVENT_FLUSH_STOP: + ++num_flush_stop; + break; + case GST_EVENT_EOS: + ++num_eos; + break; + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +GST_START_TEST (test_streamiddemux_eos) +{ + struct TestData td; + + setup_test_objects (&td); + + num_eos = 0; + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_chain_function (td.mysink[0], chain_ok); + gst_pad_set_event_function (td.mysink[0], event_func); + gst_pad_set_active (td.mysink[0], TRUE); + + td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK); + gst_pad_set_chain_function (td.mysink[1], chain_ok); + gst_pad_set_event_function (td.mysink[1], event_func); + gst_pad_set_active (td.mysink[1], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test0"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test1"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Pushing flush event"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_start ())); + fail_unless (num_flush_start == 2, + "Failed to send flush-start event to all pads internally linked"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_stop (TRUE))); + fail_unless (num_flush_stop == 2, + "Failed to send flush-stop event to all pads internally linked"); + + GST_DEBUG ("Pushing eos event"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_eos ())); + fail_unless (num_eos == 2, + "Failed to send eos event to all pads internally linked"); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_EOS); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysink[1], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysink[1]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +static Suite * +streamiddemux_suite (void) +{ + Suite *s = suite_create ("streamiddemux"); + TCase *tc_chain; + + tc_chain = tcase_create ("streamiddemux simple"); + tcase_add_test (tc_chain, test_simple_create_destroy); + tcase_add_test (tc_chain, test_streamiddemux_with_stream_start); + tcase_add_test (tc_chain, test_streamiddemux_without_stream_start); + tcase_add_test (tc_chain, test_streamiddemux_simple); + tcase_add_test (tc_chain, test_streamiddemux_num_buffers); + tcase_add_test (tc_chain, test_streamiddemux_eos); + suite_add_tcase (s, tc_chain); + + return s; +} + +GST_CHECK_MAIN (streamiddemux); diff --git a/tests/examples/Makefile.am b/tests/examples/Makefile.am index 376bac887..fce72a15c 100644 --- a/tests/examples/Makefile.am +++ b/tests/examples/Makefile.am @@ -20,6 +20,7 @@ always_dirs = \ netclock \ queue \ stepping \ + streamiddemux \ streams \ typefind diff --git a/tests/examples/streamiddemux/Makefile.am b/tests/examples/streamiddemux/Makefile.am new file mode 100644 index 000000000..e182d2988 --- /dev/null +++ b/tests/examples/streamiddemux/Makefile.am @@ -0,0 +1,6 @@ +noinst_PROGRAMS = streamiddemux-stream + +streamiddemux_stream_SOURCES = streamiddemux-stream.c +streamiddemux_stream_LDADD = $(GST_OBJ_LIBS) +streamiddemux_stream_CFLAGS = $(GST_OBJ_CFLAGS) + diff --git a/tests/examples/streamiddemux/streamiddemux-stream.c b/tests/examples/streamiddemux/streamiddemux-stream.c new file mode 100644 index 000000000..1ef128b2b --- /dev/null +++ b/tests/examples/streamiddemux/streamiddemux-stream.c @@ -0,0 +1,241 @@ +#include <gst/gst.h> + +#define NUM_STREAM 13 + +typedef struct _App App; + +struct _App +{ + GstElement *pipeline; + GstElement *audiotestsrc[NUM_STREAM]; + GstElement *audioconvert[NUM_STREAM]; + GstElement *capsfilter[NUM_STREAM]; + GstElement *vorbisenc[NUM_STREAM]; + GstElement *oggmux[NUM_STREAM]; + GstElement *funnel; + GstElement *demux; + GstElement *stream_synchronizer; + GstElement *queue[NUM_STREAM]; + GstElement *filesink[NUM_STREAM]; + + gboolean pad_blocked[NUM_STREAM]; + GstPad *queue_srcpad[NUM_STREAM]; + gulong blocked_id[NUM_STREAM]; +}; + +App s_app; + +gint pad_added_cnt = 0; + +static gboolean +bus_call (GstBus * bus, GstMessage * msg, gpointer data) +{ + GMainLoop *loop = (GMainLoop *) data; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_EOS:{ + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_ERROR:{ + g_main_loop_quit (loop); + break; + } + default: + break; + } + return TRUE; +} + +static void +set_blocked (App * app, gboolean blocked) +{ + gint i = 0; + + for (i = 0; i < NUM_STREAM; i++) { + gst_pad_remove_probe (app->queue_srcpad[i], app->blocked_id[i]); + } +} + +static void +sink_do_reconfigure (App * app) +{ + gint i = 0; + GstPad *filesink_sinkpad[NUM_STREAM]; + GstPad *sync_sinkpad[NUM_STREAM]; + GstPad *sync_srcpad[NUM_STREAM]; + GstIterator *it; + GValue item = G_VALUE_INIT; + + for (i = 0; i < NUM_STREAM; i++) { + sync_sinkpad[i] = + gst_element_get_request_pad (app->stream_synchronizer, "sink_%u"); + it = gst_pad_iterate_internal_links (sync_sinkpad[i]); + g_assert (it); + gst_iterator_next (it, &item); + sync_srcpad[i] = g_value_dup_object (&item); + g_value_unset (&item); + + filesink_sinkpad[i] = gst_element_get_static_pad (app->filesink[i], "sink"); + + gst_pad_link_full (app->queue_srcpad[i], sync_sinkpad[i], + GST_PAD_LINK_CHECK_NOTHING); + gst_pad_link_full (sync_srcpad[i], filesink_sinkpad[i], + GST_PAD_LINK_CHECK_NOTHING); + } + gst_iterator_free (it); + +} + +static GstPadProbeReturn +blocked_cb (GstPad * blockedpad, GstPadProbeInfo * info, gpointer user_data) +{ + App *app = user_data; + gint i = 0; + gboolean all_pads_blocked = TRUE; + + for (i = 0; i < NUM_STREAM; i++) { + if (blockedpad == app->queue_srcpad[i]) + app->pad_blocked[i] = TRUE; + } + + for (i = 0; i < NUM_STREAM; i++) { + if (app->queue_srcpad[i] == FALSE) { + all_pads_blocked = FALSE; + break; + } + } + + if (all_pads_blocked == TRUE) { + sink_do_reconfigure (app); + set_blocked (app, FALSE); + } + + return GST_PAD_PROBE_OK; +} + +static void +src_pad_added_cb (GstElement * demux, GstPad * pad, App * app) +{ + GstPad *queue_sinkpad[NUM_STREAM]; + + queue_sinkpad[pad_added_cnt] = + gst_element_get_static_pad (app->queue[pad_added_cnt], "sink"); + gst_pad_link_full (pad, queue_sinkpad[pad_added_cnt], + GST_PAD_LINK_CHECK_NOTHING); + + app->queue_srcpad[pad_added_cnt] = + gst_element_get_static_pad (app->queue[pad_added_cnt], "src"); + app->blocked_id[pad_added_cnt] = + gst_pad_add_probe (app->queue_srcpad[pad_added_cnt], + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, app, NULL); + + pad_added_cnt++; +} + +gint +main (gint argc, gchar * argv[]) +{ + App *app = &s_app; + + GMainLoop *loop = NULL; + GstBus *bus; + guint bus_watch_id; + + GstPad *funnel_sinkpad[NUM_STREAM]; + GstPad *funnel_srcpad; + GstPad *demux_sinkpad; + GstPad *oggmux_srcpad[NUM_STREAM]; + + guint stream_cnt = 0; + GstCaps *caps; + + gst_init (&argc, &argv); + + app->pipeline = gst_pipeline_new ("pipeline"); + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + app->audiotestsrc[stream_cnt] = + gst_element_factory_make ("audiotestsrc", NULL); + app->audioconvert[stream_cnt] = + gst_element_factory_make ("audioconvert", NULL); + app->capsfilter[stream_cnt] = gst_element_factory_make ("capsfilter", NULL); + app->vorbisenc[stream_cnt] = gst_element_factory_make ("vorbisenc", NULL); + app->oggmux[stream_cnt] = gst_element_factory_make ("oggmux", NULL); + } + + app->funnel = gst_element_factory_make ("funnel", NULL); + app->demux = gst_element_factory_make ("streamiddemux", NULL); + app->stream_synchronizer = + gst_element_factory_make ("streamsynchronizer", NULL); + + caps = gst_caps_from_string ("audio/x-raw,channels=1;"); + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + app->queue[stream_cnt] = gst_element_factory_make ("queue", NULL); + app->filesink[stream_cnt] = gst_element_factory_make ("filesink", NULL); + + g_object_set (app->audiotestsrc[stream_cnt], "wave", stream_cnt, + "num-buffers", 2000, NULL); + g_object_set (app->capsfilter[stream_cnt], "caps", caps, NULL); + g_object_set (app->filesink[stream_cnt], "location", + g_strdup_printf ("filesink_%d.ogg", stream_cnt), NULL); + } + + stream_cnt = 0; + + g_signal_connect (app->demux, "pad-added", G_CALLBACK (src_pad_added_cb), + app); + + loop = g_main_loop_new (NULL, FALSE); + + bus = gst_element_get_bus (app->pipeline); + bus_watch_id = gst_bus_add_watch (bus, bus_call, loop); + g_object_unref (bus); + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + gst_bin_add_many (GST_BIN (app->pipeline), app->audiotestsrc[stream_cnt], + app->audioconvert[stream_cnt], app->capsfilter[stream_cnt], + app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], + app->queue[stream_cnt], app->filesink[stream_cnt], NULL); + if (stream_cnt == 0) { + gst_bin_add_many (GST_BIN (app->pipeline), app->funnel, app->demux, + app->stream_synchronizer, NULL); + } + } + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + gst_element_link_many (app->audiotestsrc[stream_cnt], + app->audioconvert[stream_cnt], app->capsfilter[stream_cnt], + app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], NULL); + } + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + funnel_sinkpad[stream_cnt] = + gst_element_get_request_pad (app->funnel, "sink_%u"); + oggmux_srcpad[stream_cnt] = + gst_element_get_static_pad (app->oggmux[stream_cnt], "src"); + gst_pad_link (oggmux_srcpad[stream_cnt], funnel_sinkpad[stream_cnt]); + } + + funnel_srcpad = gst_element_get_static_pad (app->funnel, "src"); + + demux_sinkpad = gst_element_get_static_pad (app->demux, "sink"); + gst_pad_link (funnel_srcpad, demux_sinkpad); + + gst_element_set_state (app->pipeline, GST_STATE_PLAYING); + g_main_loop_run (loop); + + gst_element_set_state (app->pipeline, GST_STATE_NULL); + g_object_unref (app->pipeline); + g_source_remove (bus_watch_id); + g_main_loop_unref (loop); + + return 0; +} |