diff options
author | Jan Schmidt <jan@centricular.com> | 2014-08-01 00:07:53 +1000 |
---|---|---|
committer | Jan Schmidt <jan@centricular.com> | 2015-02-06 04:26:59 +1100 |
commit | 5e2214d309ab45ce316243bffadf19d0cd7f7384 (patch) | |
tree | b2e0deb0169e82c778185ec8ae534ed02dbdf82c | |
parent | 59431f663a282457ada06bf1170a4f31251b9b6c (diff) |
splitmux: Implement new elements for splitting files at mux level.
Implement 2 new elements - splitmuxsink and splitmuxsrc.
splitmuxsink is a bin which wraps a muxer and takes 1 video stream,
plus audio/subtitle streams, and starts a new file
whenever necessary to avoid overrunning a threshold of either bytes
or time. New files are started at a keyframe, and corresponding audio
and subtitle streams are split at packet boundaries to match
video GOP timestamps.
splitmuxsrc is a corresponding source element which handles
the splitmux:// URL and plays back all component files,
reconstructing the original elementary streams as it goes.
23 files changed, 4633 insertions, 88 deletions
diff --git a/docs/plugins/Makefile.am b/docs/plugins/Makefile.am index 7c125adff..728574f5f 100644 --- a/docs/plugins/Makefile.am +++ b/docs/plugins/Makefile.am @@ -157,6 +157,8 @@ EXTRA_HFILES = \ $(top_srcdir)/gst/multifile/gstmultifilesink.h \ $(top_srcdir)/gst/multifile/gstmultifilesrc.h \ $(top_srcdir)/gst/multifile/gstsplitfilesrc.h \ + $(top_srcdir)/gst/multifile/gstsplitmuxsrc.h \ + $(top_srcdir)/gst/multifile/gstsplitmuxsink.h \ $(top_srcdir)/gst/multipart/multipartdemux.h \ $(top_srcdir)/gst/multipart/multipartmux.h \ $(top_srcdir)/gst/isomp4/qtdemux.h \ diff --git a/docs/plugins/gst-plugins-good-plugins-docs.sgml b/docs/plugins/gst-plugins-good-plugins-docs.sgml index d48bf00e1..9222fe483 100644 --- a/docs/plugins/gst-plugins-good-plugins-docs.sgml +++ b/docs/plugins/gst-plugins-good-plugins-docs.sgml @@ -162,6 +162,8 @@ <xi:include href="xml/element-speexenc.xml" /> <xi:include href="xml/element-speexdec.xml" /> <xi:include href="xml/element-splitfilesrc.xml" /> + <xi:include href="xml/element-splitmuxsrc.xml" /> + <xi:include href="xml/element-splitmuxsink.xml" /> <xi:include href="xml/element-streaktv.xml" /> <xi:include href="xml/element-taginject.xml" /> <xi:include href="xml/element-udpsrc.xml" /> diff --git a/docs/plugins/gst-plugins-good-plugins-sections.txt b/docs/plugins/gst-plugins-good-plugins-sections.txt index 807f4b39c..ac6b4aa10 100644 --- a/docs/plugins/gst-plugins-good-plugins-sections.txt +++ b/docs/plugins/gst-plugins-good-plugins-sections.txt @@ -2014,6 +2014,34 @@ gst_split_file_src_get_type </SECTION> <SECTION> +<FILE>element-splitmuxsrc</FILE> +<TITLE>splitmuxsrc</TITLE> +GstSplitMuxSrc +<SUBSECTION Standard> +GstSplitMuxSrcClass +GST_SPLITMUX_SRC +GST_SPLITMUX_SRC_CLASS +GST_IS_SPLITMUX_SRC +GST_IS_SPLITMUX_SRC_CLASS +GST_TYPE_SPLITMUX_SRC +gst_split_mux_src_get_type +</SECTION> + +<SECTION> +<FILE>element-splitmuxsink</FILE> +<TITLE>splitmuxsink</TITLE> +GstSplitMuxSink +<SUBSECTION Standard> +GstSplitMuxSinkClass +GST_SPLITMUX_SINK +GST_SPLITMUX_SINK_CLASS +GST_IS_SPLITMUX_SINK +GST_IS_SPLITMUX_SINK_CLASS +GST_TYPE_SPLITMUX_SINK +gst_split_mux_sink_get_type +</SECTION> + +<SECTION> <FILE>element-taginject</FILE> <TITLE>taginject</TITLE> GstTagInject diff --git a/docs/plugins/gst-plugins-good-plugins.hierarchy b/docs/plugins/gst-plugins-good-plugins.hierarchy index 75fa1949a..82b5fb0e3 100644 --- a/docs/plugins/gst-plugins-good-plugins.hierarchy +++ b/docs/plugins/gst-plugins-good-plugins.hierarchy @@ -152,6 +152,8 @@ GObject GstRTSPSrc GstRgVolume GstRtpBin + GstSplitMuxSink + GstSplitMuxSrc GstCutter GstDVDec GstDVDemux diff --git a/docs/plugins/gst-plugins-good-plugins.interfaces b/docs/plugins/gst-plugins-good-plugins.interfaces index 8dfb97d03..430142456 100644 --- a/docs/plugins/gst-plugins-good-plugins.interfaces +++ b/docs/plugins/gst-plugins-good-plugins.interfaces @@ -71,6 +71,8 @@ GstSoupHTTPSrc GstURIHandler GstSpeexEnc GstPreset GstTagSetter GstSpeexEnc GstTagSetter GstPreset GstSplitFileSrc GstURIHandler +GstSplitMuxSink GstChildProxy +GstSplitMuxSrc GstChildProxy GstURIHandler GstSwitchSink GstChildProxy GstSwitchSrc GstChildProxy GstTagLibMux GstTagSetter diff --git a/gst/multifile/Makefile.am b/gst/multifile/Makefile.am index 4195bdc20..d15bc1f49 100644 --- a/gst/multifile/Makefile.am +++ b/gst/multifile/Makefile.am @@ -6,14 +6,30 @@ libgstmultifile_la_SOURCES = \ gstmultifilesrc.c \ gstmultifile.c \ gstsplitfilesrc.c \ + gstsplitmuxsink.c \ + gstsplitmuxpartreader.c \ + gstsplitmuxsrc.c \ + gstsplitutils.c \ patternspec.c libgstmultifile_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS) libgstmultifile_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-@GST_API_VERSION@ $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS) libgstmultifile_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgstmultifile_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) -noinst_HEADERS = gstmultifilesrc.h gstmultifilesink.h gstsplitfilesrc.h patternspec.h +noinst_HEADERS = gstmultifilesrc.h gstmultifilesink.h gstsplitfilesrc.h gstsplitmuxsink.h \ + gstsplitmuxsrc.h gstsplitmuxpartreader.h gstsplitutils.h patternspec.h +noinst_PROGRAMS = test-splitmux-part-reader + +test_splitmux_part_reader_SOURCES = \ + test-splitmuxpartreader.c \ + gstsplitmuxpartreader.c \ + gstsplitmuxsrc.c \ + gstsplitutils.c \ + patternspec.c +test_splitmux_part_reader_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS) +test_splitmux_part_reader_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-@GST_API_VERSION@ $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS) +test_splitmux_part_reader_LDFLAGS = $(GST_PLUGIN_LDFLAGS) Android.mk: Makefile.am $(BUILT_SOURCES) androgenizer \ diff --git a/gst/multifile/gstmultifile.c b/gst/multifile/gstmultifile.c index e89dadaa6..6cbb655f5 100644 --- a/gst/multifile/gstmultifile.c +++ b/gst/multifile/gstmultifile.c @@ -31,6 +31,8 @@ #include "gstmultifilesink.h" #include "gstmultifilesrc.h" #include "gstsplitfilesrc.h" +#include "gstsplitmuxsink.h" +#include "gstsplitmuxsrc.h" static gboolean plugin_init (GstPlugin * plugin) @@ -42,6 +44,12 @@ plugin_init (GstPlugin * plugin) gst_element_register (plugin, "splitfilesrc", GST_RANK_NONE, gst_split_file_src_get_type ()); + if (!register_splitmuxsink (plugin)) + return FALSE; + + if (!register_splitmuxsrc (plugin)) + return FALSE; + return TRUE; } diff --git a/gst/multifile/gstsplitfilesrc.c b/gst/multifile/gstsplitfilesrc.c index 98621aed4..51174e470 100644 --- a/gst/multifile/gstsplitfilesrc.c +++ b/gst/multifile/gstsplitfilesrc.c @@ -45,16 +45,10 @@ #endif #include "gstsplitfilesrc.h" -#include "patternspec.h" +#include "gstsplitutils.h" #include <string.h> -#ifdef G_OS_WIN32 -#define DEFAULT_PATTERN_MATCH_MODE MATCH_MODE_UTF8 -#else -#define DEFAULT_PATTERN_MATCH_MODE MATCH_MODE_AUTO -#endif - enum { PROP_LOCATION = 1 @@ -237,84 +231,6 @@ gst_split_file_src_get_property (GObject * object, guint prop_id, } } -static int -gst_split_file_src_array_sortfunc (gchar ** a, gchar ** b) -{ - return strcmp (*a, *b); -} - -static gchar ** -gst_split_file_src_find_files (GstSplitFileSrc * src, const gchar * dirname, - const gchar * basename, GError ** err) -{ - PatternSpec *pspec; - GPtrArray *files; - const gchar *name; - GDir *dir; - - if (dirname == NULL || basename == NULL) - goto invalid_location; - - GST_INFO_OBJECT (src, "checking in directory '%s' for pattern '%s'", - dirname, basename); - - dir = g_dir_open (dirname, 0, err); - if (dir == NULL) - return NULL; - - if (DEFAULT_PATTERN_MATCH_MODE == MATCH_MODE_UTF8 && - !g_utf8_validate (basename, -1, NULL)) { - goto not_utf8; - } - - /* mode will be AUTO on linux/unix and UTF8 on win32 */ - pspec = pattern_spec_new (basename, DEFAULT_PATTERN_MATCH_MODE); - - files = g_ptr_array_new (); - - while ((name = g_dir_read_name (dir))) { - GST_TRACE_OBJECT (src, "check: %s", name); - if (pattern_match_string (pspec, name)) { - GST_DEBUG_OBJECT (src, "match: %s", name); - g_ptr_array_add (files, g_build_filename (dirname, name, NULL)); - } - } - - if (files->len == 0) - goto no_matches; - - g_ptr_array_sort (files, (GCompareFunc) gst_split_file_src_array_sortfunc); - g_ptr_array_add (files, NULL); - - pattern_spec_free (pspec); - g_dir_close (dir); - - return (gchar **) g_ptr_array_free (files, FALSE); - -/* ERRORS */ -invalid_location: - { - g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_INVAL, - "No filename specified."); - return NULL; - } -not_utf8: - { - g_dir_close (dir); - g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_INVAL, - "Filename pattern must be UTF-8 on Windows."); - return NULL; - } -no_matches: - { - pattern_spec_free (pspec); - g_dir_close (dir); - g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_NOENT, - "Found no files matching the pattern."); - return NULL; - } -} - static gboolean gst_split_file_src_start (GstBaseSrc * basesrc) { @@ -335,7 +251,7 @@ gst_split_file_src_start (GstBaseSrc * basesrc) } GST_OBJECT_UNLOCK (src); - files = gst_split_file_src_find_files (src, dirname, basename, &err); + files = gst_split_util_find_files (dirname, basename, &err); if (files == NULL || *files == NULL) goto no_files; diff --git a/gst/multifile/gstsplitmuxpartreader.c b/gst/multifile/gstsplitmuxpartreader.c new file mode 100644 index 000000000..5996aa055 --- /dev/null +++ b/gst/multifile/gstsplitmuxpartreader.c @@ -0,0 +1,1258 @@ +/* GStreamer Split Demuxer bin that recombines files created by + * the splitmuxsink element. + * + * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> +#include "gstsplitmuxsrc.h" + +GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug); +#define GST_CAT_DEFAULT splitmux_part_debug + +#define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock) +#define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock) +#define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock) +#define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond) + +enum +{ + SIGNAL_PREPARED, + LAST_SIGNAL +}; + +static guint part_reader_signals[LAST_SIGNAL] = { 0 }; + +typedef struct _GstSplitMuxPartPad +{ + GstPad parent; + + /* Reader we belong to */ + GstSplitMuxPartReader *reader; + /* Output splitmuxsrc source pad */ + GstPad *target; + + GstDataQueue *queue; + + gboolean is_eos; + gboolean flushing; + gboolean seen_buffer; + + GstClockTime max_ts; + GstSegment segment; + + GstSegment orig_segment; + GstClockTime initial_ts_offset; +} GstSplitMuxPartPad; + +typedef struct _GstSplitMuxPartPadClass +{ + GstPadClass parent; +} GstSplitMuxPartPadClass; + +static GType gst_splitmux_part_pad_get_type (void); +#define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type() +#define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p)) + +static void splitmux_part_pad_constructed (GObject * pad); +static void splitmux_part_pad_finalize (GObject * pad); +static void handle_buffer_measuring (GstSplitMuxPartReader * reader, + GstSplitMuxPartPad * part_pad, GstBuffer * buf); + +static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); +static void type_found (GstElement * typefind, guint probability, + GstCaps * caps, GstSplitMuxPartReader * reader); +static void check_if_pads_collected (GstSplitMuxPartReader * reader); + +/* Called with reader lock held */ +static gboolean +have_empty_queue (GstSplitMuxPartReader * reader) +{ + GList *cur; + + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (part_pad->is_eos) { + GST_LOG_OBJECT (part_pad, "Pad is EOS"); + return TRUE; + } + if (gst_data_queue_is_empty (part_pad->queue)) { + GST_LOG_OBJECT (part_pad, "Queue is empty"); + return TRUE; + } + } + + return FALSE; +} + +/* Called with reader lock held */ +static gboolean +block_until_can_push (GstSplitMuxPartReader * reader) +{ + while (reader->running) { + if (reader->flushing) + goto out; + if (reader->active && have_empty_queue (reader)) + goto out; + + GST_LOG_OBJECT (reader, + "Waiting for activation or empty queue on reader %s", reader->path); + SPLITMUX_PART_WAIT (reader); + } + + GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d", + reader->path, reader->active, reader->flushing); +out: + return reader->active && !reader->flushing; +} + +static void +handle_buffer_measuring (GstSplitMuxPartReader * reader, + GstSplitMuxPartPad * part_pad, GstBuffer * buf) +{ + GstClockTime ts = GST_CLOCK_TIME_NONE; + GstClockTimeDiff offset; + + if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS && + !part_pad->seen_buffer) { + /* If this is the first buffer on the pad in the collect_streams state, + * then calculate inital offset based on running time of this segment */ + part_pad->initial_ts_offset = + part_pad->orig_segment.start + part_pad->orig_segment.base - + part_pad->orig_segment.time; + GST_DEBUG_OBJECT (reader, + "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT, + part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset)); + } + part_pad->seen_buffer = TRUE; + + /* Adjust buffer timestamps */ + offset = reader->start_offset + part_pad->segment.base; + offset -= part_pad->initial_ts_offset; + + /* Update the stored max duration on the pad, + * always preferring making DTS contiguous + * where possible */ + if (GST_BUFFER_DTS_IS_VALID (buf)) + ts = GST_BUFFER_DTS (buf) + offset; + else if (GST_BUFFER_PTS_IS_VALID (buf)) + ts = GST_BUFFER_PTS (buf) + offset; + + GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT + " incoming PTS %" GST_TIME_FORMAT + " DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT + " to %" GST_TIME_FORMAT, part_pad, + GST_TIME_ARGS (GST_BUFFER_DTS (buf)), + GST_TIME_ARGS (GST_BUFFER_PTS (buf)), + GST_TIME_ARGS (offset), GST_TIME_ARGS (ts)); + + if (GST_CLOCK_TIME_IS_VALID (ts)) { + if (GST_BUFFER_DURATION_IS_VALID (buf)) + ts += GST_BUFFER_DURATION (buf); + + if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) { + part_pad->max_ts = ts; + GST_LOG_OBJECT (reader, + "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad, + GST_TIME_ARGS (part_pad->max_ts)); + } + } + /* Is it time to move to measuring state yet? */ + check_if_pads_collected (reader); +} + +static gboolean +splitmux_data_queue_is_full_cb (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata) +{ + /* Arbitrary safety limit. If we hit it, playback is likely to stall */ + if (time > 20 * GST_SECOND) + return TRUE; + return FALSE; +} + +static void +splitmux_part_free_queue_item (GstDataQueueItem * item) +{ + gst_mini_object_unref (item->object); + g_slice_free (GstDataQueueItem, item); +} + +static GstFlowReturn +splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); + GstSplitMuxPartReader *reader = part_pad->reader; + GstDataQueueItem *item; + GstClockTimeDiff offset; + + GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf); + SPLITMUX_PART_LOCK (reader); + + if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS || + reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) { + handle_buffer_measuring (reader, part_pad, buf); + gst_buffer_unref (buf); + SPLITMUX_PART_UNLOCK (reader); + return GST_FLOW_OK; + } + + if (!block_until_can_push (reader)) { + /* Flushing */ + SPLITMUX_PART_UNLOCK (reader); + gst_buffer_unref (buf); + return GST_FLOW_FLUSHING; + } + + /* Adjust buffer timestamps */ + offset = reader->start_offset + part_pad->segment.base; + offset -= part_pad->initial_ts_offset; + + if (GST_BUFFER_PTS_IS_VALID (buf)) + GST_BUFFER_PTS (buf) += offset; + if (GST_BUFFER_DTS_IS_VALID (buf)) + GST_BUFFER_DTS (buf) += offset; + + /* We are active, and one queue is empty, place this buffer in + * the dataqueue */ + GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf); + item = g_slice_new (GstDataQueueItem); + item->destroy = (GDestroyNotify) splitmux_part_free_queue_item; + item->object = GST_MINI_OBJECT (buf); + item->size = gst_buffer_get_size (buf); + item->duration = GST_BUFFER_DURATION (buf); + if (item->duration == GST_CLOCK_TIME_NONE) + item->duration = 0; + item->visible = TRUE; + + gst_object_ref (part_pad); + + SPLITMUX_PART_UNLOCK (reader); + + if (!gst_data_queue_push (part_pad->queue, item)) { + splitmux_part_free_queue_item (item); + gst_object_unref (part_pad); + return GST_FLOW_FLUSHING; + } + + gst_object_unref (part_pad); + return GST_FLOW_OK; +} + +/* Called with splitmux part lock held */ +static gboolean +splitmux_part_is_eos_locked (GstSplitMuxPartReader * part) +{ + GList *cur; + for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (!part_pad->is_eos) + return FALSE; + } + + return TRUE; +} + +static gboolean +splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part) +{ + GList *cur; + GST_LOG_OBJECT (part, "Checking for preroll"); + for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (!part_pad->seen_buffer) { + GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled", + part_pad); + return FALSE; + } + } + GST_LOG_OBJECT (part, "Part is prerolled"); + return TRUE; +} + + +gboolean +gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader) +{ + gboolean res; + + SPLITMUX_PART_LOCK (reader); + res = splitmux_part_is_eos_locked (reader); + SPLITMUX_PART_UNLOCK (reader); + + return res; +} + +/* Called with splitmux part lock held */ +static gboolean +splitmux_is_flushing (GstSplitMuxPartReader * reader) +{ + GList *cur; + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (part_pad->flushing) + return TRUE; + } + + return FALSE; +} + +static gboolean +splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); + GstSplitMuxPartReader *reader = part_pad->reader; + gboolean ret = TRUE; + SplitMuxSrcPad *target; + GstDataQueueItem *item; + + SPLITMUX_PART_LOCK (reader); + + target = gst_object_ref (part_pad->target); + + GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad, + event); + + if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) + goto drop_event; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT:{ + GstSegment *seg = &part_pad->segment; + + GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event); + + gst_event_copy_segment (event, seg); + gst_event_copy_segment (event, &part_pad->orig_segment); + + if (seg->format != GST_FORMAT_TIME) + goto wrong_segment; + + /* Adjust segment */ + /* Adjust start/stop so the overall file is 0 + start_offset based */ + if (seg->stop != -1) { + seg->stop -= seg->start; + seg->stop += seg->time + reader->start_offset; + } + seg->start = seg->time + reader->start_offset; + seg->time += reader->start_offset; + seg->position += reader->start_offset; + + GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event); + + /* Replace event */ + gst_event_unref (event); + event = gst_event_new_segment (seg); + + if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS + && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS) + break; /* Only do further stuff with segments during initial measuring */ + + /* Take the first segment from the first part */ + if (target->segment.format == GST_FORMAT_UNDEFINED) { + gst_segment_copy_into (seg, &target->segment); + GST_DEBUG_OBJECT (reader, + "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment); + } + + if (seg->stop != -1 && target->segment.stop != -1) { + GstClockTime stop = seg->base + seg->stop; + if (stop > target->segment.stop) { + target->segment.stop = stop; + GST_DEBUG_OBJECT (reader, + "Adjusting segment stop by %" GST_TIME_FORMAT + " output now %" GST_SEGMENT_FORMAT, + GST_TIME_ARGS (reader->start_offset), &target->segment); + } + } + GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event); + break; + } + case GST_EVENT_EOS:{ + + GST_DEBUG_OBJECT (part_pad, + "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT, + reader->prep_state, GST_TIME_ARGS (part_pad->max_ts)); + + if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) { + /* Mark this pad as EOS */ + part_pad->is_eos = TRUE; + if (splitmux_part_is_eos_locked (reader)) { + /* Finished measuring things, set state and tell the state change func + * so it can seek back to the start */ + GST_LOG_OBJECT (reader, "EOS while measuring streams"); + reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY; + SPLITMUX_PART_BROADCAST (reader); + } + goto drop_event; + } + break; + } + case GST_EVENT_FLUSH_START: + reader->flushing = TRUE; + part_pad->flushing = TRUE; + GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue", + part_pad); + gst_data_queue_set_flushing (part_pad->queue, TRUE); + SPLITMUX_PART_BROADCAST (reader); + break; + case GST_EVENT_FLUSH_STOP:{ + gst_data_queue_set_flushing (part_pad->queue, FALSE); + gst_data_queue_flush (part_pad->queue); + part_pad->seen_buffer = FALSE; + part_pad->flushing = FALSE; + part_pad->is_eos = FALSE; + + reader->flushing = splitmux_is_flushing (reader); + GST_LOG_OBJECT (reader, + "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d", + reader->path, pad, reader->flushing); + SPLITMUX_PART_BROADCAST (reader); + break; + } + default: + break; + } + + /* Don't send events downstream while preparing */ + if (reader->prep_state != PART_STATE_READY) + goto drop_event; + + /* Don't pass flush events - those are done by the parent */ + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START || + GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) + goto drop_event; + + if (!block_until_can_push (reader)) { + SPLITMUX_PART_UNLOCK (reader); + gst_object_unref (target); + gst_event_unref (event); + return FALSE; + } + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_GAP:{ + /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */ + goto drop_event; + } + default: + break; + } + + /* We are active, and one queue is empty, place this buffer in + * the dataqueue */ + gst_object_ref (part_pad->queue); + SPLITMUX_PART_UNLOCK (reader); + + GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event); + item = g_slice_new (GstDataQueueItem); + item->destroy = (GDestroyNotify) splitmux_part_free_queue_item; + item->object = GST_MINI_OBJECT (event); + item->size = 0; + item->duration = 0; + if (item->duration == GST_CLOCK_TIME_NONE) + item->duration = 0; + item->visible = FALSE; + + if (!gst_data_queue_push (part_pad->queue, item)) { + splitmux_part_free_queue_item (item); + ret = FALSE; + } + + gst_object_unref (part_pad->queue); + + return ret; +wrong_segment: + gst_event_unref (event); + SPLITMUX_PART_UNLOCK (reader); + GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL), + ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT, + reader->path, pad)); + return FALSE; +drop_event: + GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT + " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target); + gst_event_unref (event); + SPLITMUX_PART_UNLOCK (reader); + return TRUE; +} + +static gboolean +splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad); + GstSplitMuxPartReader *reader = part_pad->reader; + GstPad *target; + gboolean ret = FALSE; + gboolean active; + + SPLITMUX_PART_LOCK (reader); + target = gst_object_ref (part_pad->target); + active = reader->active; + SPLITMUX_PART_UNLOCK (reader); + + if (active) { + GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT + " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target); + + ret = gst_pad_query (target, query); + } + + gst_object_unref (target); + + return ret; +} + +G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD); + +static void +splitmux_part_pad_constructed (GObject * pad) +{ + gst_pad_set_chain_function (GST_PAD (pad), + GST_DEBUG_FUNCPTR (splitmux_part_pad_chain)); + gst_pad_set_event_function (GST_PAD (pad), + GST_DEBUG_FUNCPTR (splitmux_part_pad_event)); + gst_pad_set_query_function (GST_PAD (pad), + GST_DEBUG_FUNCPTR (splitmux_part_pad_query)); +} + +static void +gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass) +{ + GObjectClass *gobject_klass = (GObjectClass *) (klass); + + gobject_klass->constructed = splitmux_part_pad_constructed; + gobject_klass->finalize = splitmux_part_pad_finalize; +} + +static void +gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad) +{ + pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb, + NULL, NULL, pad); + gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED); +} + +static void +splitmux_part_pad_finalize (GObject * obj) +{ + GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj); + g_object_unref ((GObject *) (pad->queue)); +} + +static void +new_decoded_pad_added_cb (GstElement * element, GstPad * pad, + GstSplitMuxPartReader * part); +static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader); +static GstStateChangeReturn +gst_splitmux_part_reader_change_state (GstElement * element, + GstStateChange transition); +static gboolean gst_splitmux_part_reader_send_event (GstElement * element, + GstEvent * event); +static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader + * part, gboolean flushing); +static void bus_handler (GstBin * bin, GstMessage * msg); + +#define gst_splitmux_part_reader_parent_class parent_class +G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader, + GST_TYPE_PIPELINE); + +static void +gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass) +{ + GstElementClass *gstelement_class = (GstElementClass *) klass; + GstBinClass *gstbin_class = (GstBinClass *) klass; + + GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0, + "Split File Demuxing Source helper"); + + part_reader_signals[SIGNAL_PREPARED] = + g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass, + prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gstelement_class->change_state = gst_splitmux_part_reader_change_state; + gstelement_class->send_event = gst_splitmux_part_reader_send_event; + + gstbin_class->handle_message = bus_handler; +} + +static void +gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader) +{ + GstElement *typefind; + + reader->active = FALSE; + reader->duration = GST_CLOCK_TIME_NONE; + + g_cond_init (&reader->inactive_cond); + g_mutex_init (&reader->lock); + + /* FIXME: Create elements on a state change */ + reader->src = gst_element_factory_make ("filesrc", NULL); + if (reader->src == NULL) { + GST_ERROR_OBJECT (reader, "Failed to create filesrc element"); + return; + } + gst_bin_add (GST_BIN_CAST (reader), reader->src); + + typefind = gst_element_factory_make ("typefind", NULL); + if (!typefind) { + GST_ERROR_OBJECT (reader, + "Failed to create typefind element - check your installation"); + return; + } + + gst_bin_add (GST_BIN_CAST (reader), typefind); + reader->typefind = typefind; + + if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) { + GST_ERROR_OBJECT (reader, + "Failed to link typefind element - check your installation"); + return; + } + + g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found), + reader); +} + +static GstSplitMuxPartPad * +gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader, + GstPad * target) +{ + GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD, + "name", GST_PAD_NAME (target), + "direction", GST_PAD_SINK, + NULL); + pad->target = target; + pad->reader = reader; + + gst_pad_set_active (GST_PAD_CAST (pad), TRUE); + + return pad; +} + +static void +new_decoded_pad_added_cb (GstElement * element, GstPad * pad, + GstSplitMuxPartReader * reader) +{ + GstPad *out_pad = NULL; + GstSplitMuxPartPad *proxy_pad; + GstCaps *caps; + GstPadLinkReturn link_ret; + + caps = gst_pad_get_current_caps (pad); + + GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT + " caps %" GST_PTR_FORMAT, reader->path, pad, caps); + /* Look up or create the output pad */ + if (reader->get_pad_cb) + out_pad = reader->get_pad_cb (reader, pad, reader->cb_data); + if (out_pad == NULL) + return; + + /* Create our proxy pad to interact with this new pad */ + proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad); + GST_DEBUG_OBJECT (reader, + "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT, + proxy_pad, out_pad); + + link_ret = gst_pad_link (pad, GST_PAD (proxy_pad)); + if (link_ret != GST_PAD_LINK_OK) { + gst_object_unref (proxy_pad); + GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL), + ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT + " ret %d", reader->path, pad, link_ret)); + return; + } + GST_DEBUG_OBJECT (reader, + "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT, + pad, proxy_pad); + + SPLITMUX_PART_LOCK (reader); + reader->pads = g_list_prepend (reader->pads, proxy_pad); + SPLITMUX_PART_UNLOCK (reader); +} + +static gboolean +gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event) +{ + GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element; + gboolean ret = FALSE; + GstPad *pad = NULL; + + /* Send event to the first source pad we found */ + SPLITMUX_PART_LOCK (reader); + if (reader->pads) { + GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data); + pad = gst_pad_get_peer (proxy_pad); + } + SPLITMUX_PART_UNLOCK (reader); + + if (pad) { + ret = gst_pad_send_event (pad, event); + gst_object_unref (pad); + } else { + gst_event_unref (event); + } + + return ret; +} + +/* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */ +static void +gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader, + GstClockTime time) +{ + SPLITMUX_PART_UNLOCK (reader); + GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT, + GST_TIME_ARGS (time)); + gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME, + GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time, + GST_SEEK_TYPE_END, 0); + + SPLITMUX_PART_LOCK (reader); + + /* Wait for flush to finish, so old data is gone */ + while (reader->flushing) { + GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path); + SPLITMUX_PART_WAIT (reader); + } +} + +/* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */ +static gboolean +gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader, + GstSegment * target_seg) +{ + GstSeekFlags flags; + GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE; + + flags = target_seg->flags | GST_SEEK_FLAG_FLUSH; + + SPLITMUX_PART_LOCK (reader); + if (target_seg->start >= reader->start_offset) + start = target_seg->start - reader->start_offset; + /* If the segment stop is within this part, don't play to the end */ + if (target_seg->stop != -1 && + target_seg->stop < reader->start_offset + reader->duration) + stop = target_seg->stop - reader->start_offset; + + SPLITMUX_PART_UNLOCK (reader); + + GST_DEBUG_OBJECT (reader, + "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %" + GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags, + GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); + + return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate, + target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, + stop); +} + +/* Called with lock held */ +static void +gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader) +{ + /* Trigger a flushing seek to near the end of the file and run each stream + * to EOS in order to find the smallest end timestamp to start the next + * file from + */ + if (GST_CLOCK_TIME_IS_VALID (reader->duration) + && reader->duration > GST_SECOND) { + GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND); + gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts); + } + + /* Wait for things to happen */ + while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) + SPLITMUX_PART_WAIT (reader); + + /* Seek back to the start now */ + if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) { + /* Fire the prepared signal */ + GST_DEBUG_OBJECT (reader, + "Stream measuring complete. File %s is now ready. Firing prepared signal", + reader->path); + reader->prep_state = PART_STATE_READY; + g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL); + } +} + +static GstElement * +find_demuxer (GstCaps * caps) +{ + GList *factories = + gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER, + GST_RANK_MARGINAL); + GList *compat_elements; + GstElement *e = NULL; + + if (factories == NULL) + return NULL; + + compat_elements = + gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE); + + if (compat_elements) { + /* Just take the first (highest ranked) option */ + GstElementFactory *factory = + GST_ELEMENT_FACTORY_CAST (compat_elements->data); + e = gst_element_factory_create (factory, NULL); + gst_plugin_feature_list_free (compat_elements); + } + + if (factories) + gst_plugin_feature_list_free (factories); + + return e; +} + +static void +type_found (GstElement * typefind, guint probability, + GstCaps * caps, GstSplitMuxPartReader * reader) +{ + GstElement *demux; + + GST_WARNING ("Got type %" GST_PTR_FORMAT, caps); + + /* typefind found a type. Look for the demuxer to handle it */ + demux = reader->demux = find_demuxer (caps); + if (reader->demux == NULL) { + GST_ERROR_OBJECT (reader, "Failed to create demuxer element"); + return; + } + + gst_bin_add (GST_BIN_CAST (reader), demux); + gst_element_link_pads (reader->typefind, "src", demux, NULL); + gst_element_set_state (reader->demux, GST_STATE_PLAYING); + + /* Connect to demux signals */ + g_signal_connect (demux, + "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader); + g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader); +} + +static void +check_if_pads_collected (GstSplitMuxPartReader * reader) +{ + if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) { + /* Check we have all pads and each pad has seen a buffer */ + if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) { + GST_DEBUG_OBJECT (reader, + "no more pads - file %s. Measuring stream length", reader->path); + reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS; + SPLITMUX_PART_BROADCAST (reader); + } + } +} + +static void +no_more_pads (GstElement * element, GstSplitMuxPartReader * reader) +{ + GstClockTime duration = GST_CLOCK_TIME_NONE; + GList *cur; + /* Query the minimum duration of any pad in this piece and store it. + * FIXME: Only consider audio and video */ + SPLITMUX_PART_LOCK (reader); + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstPad *target = GST_PAD_CAST (cur->data); + if (target) { + gint64 cur_duration; + if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) { + GST_INFO_OBJECT (reader, + "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT, + reader->path, target, GST_TIME_ARGS (cur_duration)); + if (cur_duration < duration) + duration = cur_duration; + } + } + } + GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT, + reader->path, GST_TIME_ARGS (duration)); + reader->duration = (GstClockTime) duration; + + reader->no_more_pads = TRUE; + + check_if_pads_collected (reader); + SPLITMUX_PART_UNLOCK (reader); +} + +gboolean +gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part, + GstPad * src_pad, GstQuery * query) +{ + GstPad *target = NULL; + gboolean ret; + GList *cur; + + SPLITMUX_PART_LOCK (part); + /* Find the pad corresponding to the visible output target pad */ + for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (part_pad->target == src_pad) { + target = gst_object_ref (GST_OBJECT_CAST (part_pad)); + break; + } + } + SPLITMUX_PART_UNLOCK (part); + + if (target == NULL) + return FALSE; + + ret = gst_pad_peer_query (target, query); + gst_object_unref (GST_OBJECT_CAST (target)); + + if (ret == FALSE) + return ret; + + /* Post-massaging of queries */ + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_POSITION:{ + GstFormat fmt; + gint64 position; + + gst_query_parse_position (query, &fmt, &position); + if (fmt != GST_FORMAT_TIME) + return FALSE; + SPLITMUX_PART_LOCK (part); + position += part->start_offset; + GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT, + GST_TIME_ARGS (position)); + SPLITMUX_PART_UNLOCK (part); + + gst_query_set_position (query, fmt, position); + break; + } + default: + break; + } + + return ret; +} + +static GstStateChangeReturn +gst_splitmux_part_reader_change_state (GstElement * element, + GstStateChange transition) +{ + GstStateChangeReturn ret; + GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY:{ + break; + } + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + g_object_set (reader->src, "location", reader->path, NULL); + SPLITMUX_PART_LOCK (reader); + reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS; + gst_splitmux_part_reader_set_flushing_locked (reader, FALSE); + reader->running = TRUE; + SPLITMUX_PART_UNLOCK (reader); + break; + } + case GST_STATE_CHANGE_READY_TO_NULL: + case GST_STATE_CHANGE_PAUSED_TO_READY: + SPLITMUX_PART_LOCK (reader); + gst_splitmux_part_reader_set_flushing_locked (reader, TRUE); + reader->running = FALSE; + SPLITMUX_PART_BROADCAST (reader); + SPLITMUX_PART_UNLOCK (reader); + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + SPLITMUX_PART_LOCK (reader); + reader->active = FALSE; + gst_splitmux_part_reader_set_flushing_locked (reader, TRUE); + SPLITMUX_PART_BROADCAST (reader); + SPLITMUX_PART_UNLOCK (reader); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (ret == GST_STATE_CHANGE_FAILURE) + goto beach; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + /* Sleep and wait until all streams have been collected, then do the seeks + * to measure the stream lengths */ + SPLITMUX_PART_LOCK (reader); + + while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) { + GST_LOG_OBJECT (reader, "Waiting to collect all output streams"); + SPLITMUX_PART_WAIT (reader); + } + + if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) + gst_splitmux_part_reader_measure_streams (reader); + else if (reader->prep_state == PART_STATE_FAILED) + ret = GST_STATE_CHANGE_FAILURE; + SPLITMUX_PART_UNLOCK (reader); + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + SPLITMUX_PART_LOCK (reader); + gst_splitmux_part_reader_set_flushing_locked (reader, FALSE); + reader->active = TRUE; + SPLITMUX_PART_BROADCAST (reader); + SPLITMUX_PART_UNLOCK (reader); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + reader->prep_state = PART_STATE_NULL; + break; + default: + break; + } + +beach: + return ret; +} + +static gboolean +check_bus_messages (GstSplitMuxPartReader * part) +{ + gboolean ret = FALSE; + GstBus *bus; + GstMessage *m; + + bus = gst_element_get_bus (GST_ELEMENT_CAST (part)); + while ((m = gst_bus_pop (bus)) != NULL) { + if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) { + GST_LOG_OBJECT (part, "Got error message while preparing. Failing."); + gst_message_unref (m); + goto done; + } + gst_message_unref (m); + } + ret = TRUE; +done: + gst_object_unref (bus); + return ret; +} + +gboolean +gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part) +{ + GstStateChangeReturn ret; + + ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED); + + if (ret != GST_STATE_CHANGE_SUCCESS) + return FALSE; + + return check_bus_messages (part); +} + +void +gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part) +{ + gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL); +} + +void +gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader, + const gchar * path) +{ + reader->path = g_strdup (path); +} + +gboolean +gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader, + GstSegment * seg) +{ + GST_DEBUG_OBJECT (reader, "Activating part reader"); + + if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) { + GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT, + seg); + return FALSE; + } + if (gst_element_set_state (GST_ELEMENT_CAST (reader), + GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING"); + return FALSE; + } + return TRUE; +} + +void +gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader) +{ + GST_DEBUG_OBJECT (reader, "Deactivating reader"); + gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED); +} + +void +gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader, + gboolean flushing) +{ + GList *cur; + + GST_LOG_OBJECT (reader, "%s dataqueues", + flushing ? "Flushing" : "Done flushing"); + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + gst_data_queue_set_flushing (part_pad->queue, flushing); + if (flushing) + gst_data_queue_flush (part_pad->queue); + } +}; + +void +gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader, + gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb) +{ + reader->cb_data = cb_data; + reader->get_pad_cb = get_pad_cb; +} + +GstClockTime +gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader) +{ + GList *cur; + GstClockTime ret = GST_CLOCK_TIME_NONE; + + SPLITMUX_PART_LOCK (reader); + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (part_pad->max_ts < ret) + ret = part_pad->max_ts; + } + + SPLITMUX_PART_UNLOCK (reader); + + return ret; +} + +void +gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader, + GstClockTime offset) +{ + SPLITMUX_PART_LOCK (reader); + reader->start_offset = offset; + GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT, + GST_TIME_ARGS (offset)); + SPLITMUX_PART_UNLOCK (reader); +} + +GstClockTime +gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader) +{ + GstClockTime ret = GST_CLOCK_TIME_NONE; + + SPLITMUX_PART_LOCK (reader); + ret = reader->start_offset; + SPLITMUX_PART_UNLOCK (reader); + + return ret; +} + +GstClockTime +gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader) +{ + GstClockTime dur; + + SPLITMUX_PART_LOCK (reader); + dur = reader->duration; + SPLITMUX_PART_UNLOCK (reader); + + return dur; +} + +GstPad * +gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader, + GstPad * target) +{ + GstPad *result = NULL; + GList *cur; + + SPLITMUX_PART_LOCK (reader); + for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + if (part_pad->target == target) { + result = (GstPad *) part_pad; + break; + } + } + SPLITMUX_PART_UNLOCK (reader); + + return result; +} + +GstFlowReturn +gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad, + GstDataQueueItem ** item) +{ + GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad); + GstDataQueue *q; + + /* Get one item from the appropriate dataqueue */ + SPLITMUX_PART_LOCK (reader); + if (reader->prep_state == PART_STATE_FAILED) { + SPLITMUX_PART_UNLOCK (reader); + return GST_FLOW_ERROR; + } + + q = gst_object_ref (part_pad->queue); + + /* Have to drop the lock around pop, so we can be woken up for flush */ + SPLITMUX_PART_UNLOCK (reader); + if (!gst_data_queue_pop (q, item) || (*item == NULL)) + return GST_FLOW_FLUSHING; + + SPLITMUX_PART_LOCK (reader); + + SPLITMUX_PART_BROADCAST (reader); + if (GST_IS_EVENT ((*item)->object)) { + GstEvent *e = (GstEvent *) ((*item)->object); + /* Mark this pad as EOS */ + if (GST_EVENT_TYPE (e) == GST_EVENT_EOS) + part_pad->is_eos = TRUE; + } + + SPLITMUX_PART_UNLOCK (reader); + gst_object_unref (q); + return GST_FLOW_OK; +} + +static void +bus_handler (GstBin * bin, GstMessage * message) +{ + GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin; + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR: + /* Make sure to set the state to failed and wake up the listener + * on error */ + SPLITMUX_PART_LOCK (reader); + reader->prep_state = PART_STATE_FAILED; + SPLITMUX_PART_BROADCAST (reader); + SPLITMUX_PART_UNLOCK (reader); + break; + default: + break; + } + + GST_BIN_CLASS (parent_class)->handle_message (bin, message); +} diff --git a/gst/multifile/gstsplitmuxpartreader.h b/gst/multifile/gstsplitmuxpartreader.h new file mode 100644 index 000000000..b1778f271 --- /dev/null +++ b/gst/multifile/gstsplitmuxpartreader.h @@ -0,0 +1,117 @@ +/* GStreamer Split Muxed File Source - Part reader + * Copyright (C) 2014 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifndef __GST_SPLITMUX_PART_READER_H__ +#define __GST_SPLITMUX_PART_READER_H__ + +#include <gst/gst.h> +#include <gst/base/gstdataqueue.h> + +G_BEGIN_DECLS + +#define GST_TYPE_SPLITMUX_PART_READER \ + (gst_splitmux_part_reader_get_type()) +#define GST_SPLITMUX_PART_READER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_PART_READER,GstSplitMuxSrc)) +#define GST_SPLITMUX_PART_READER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_PART_READER,GstSplitMuxSrcClass)) +#define GST_IS_SPLITMUX_PART_READER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_PART_READER)) +#define GST_IS_SPLITMUX_PART_READER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_PART_READER)) + +typedef struct _GstSplitMuxPartReader GstSplitMuxPartReader; +typedef struct _GstSplitMuxPartReaderClass GstSplitMuxPartReaderClass; +typedef struct _SplitMuxSrcPad SplitMuxSrcPad; +typedef struct _SplitMuxSrcPadClass SplitMuxSrcPadClass; + +typedef enum +{ + PART_STATE_NULL, + PART_STATE_PREPARING_COLLECT_STREAMS, + PART_STATE_PREPARING_MEASURE_STREAMS, + PART_STATE_PREPARING_RESET_FOR_READY, + PART_STATE_READY, + PART_STATE_FAILED, +} GstSplitMuxPartState; + +typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data); + +struct _GstSplitMuxPartReader +{ + GstPipeline parent; + + GstSplitMuxPartState prep_state; + + gchar *path; + + GstElement *src; + GstElement *typefind; + GstElement *demux; + + gboolean active; + gboolean running; + gboolean prepared; + gboolean flushing; + gboolean no_more_pads; + + GstClockTime duration; + GstClockTime start_offset; + + GList *pads; + + GCond inactive_cond; + GMutex lock; + + GstSplitMuxPartReaderPadCb get_pad_cb; + gpointer cb_data; +}; + +struct _GstSplitMuxPartReaderClass +{ + GstPipelineClass parent_class; + + void (*prepared) (GstSplitMuxPartReader *reader); + void (*end_of_part) (GstSplitMuxPartReader *reader); +}; + +GType gst_splitmux_part_reader_get_type (void); + +void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader, + gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb); +gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part); +void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part); +void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader, + const gchar *path); +gboolean gst_splitmux_part_is_eos (GstSplitMuxPartReader *reader); + +gboolean gst_splitmux_part_reader_activate (GstSplitMuxPartReader *part, GstSegment *seg); +void gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader *part); + +gboolean gst_splitmux_part_reader_src_query (GstSplitMuxPartReader *part, GstPad *src_pad, GstQuery * query); +void gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader *part, GstClockTime offset); +GstClockTime gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader *part); +GstClockTime gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader *part); +GstClockTime gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader); + +GstPad *gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader *reader, GstPad *target); +GstFlowReturn gst_splitmux_part_reader_pop (GstSplitMuxPartReader *reader, GstPad *part_pad, GstDataQueueItem ** item); + +G_END_DECLS + +#endif diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c new file mode 100644 index 000000000..c6b4e4a6f --- /dev/null +++ b/gst/multifile/gstsplitmuxsink.c @@ -0,0 +1,1460 @@ +/* GStreamer Muxer bin that splits output stream by size/time + * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-splitmuxsink + * @short_description: Muxer wrapper for splitting output stream by size or time + * + * This element wraps a muxer and a sink, and starts a new file when the mux + * contents are about to cross a threshold of maximum size of maximum time, + * splitting at video keyframe boundaries. Exactly one input video stream + * is required, with as many accompanying audio and subtitle streams as + * desired. + * + * By default, it uses mp4mux and filesink, but they can be changed via + * the 'muxer' and 'sink' properties. + * + * The minimum file size is 1 GOP, however - so limits may be overrun if the + * distance between any 2 keyframes is larger than the limits. + * + * The splitting process is driven by the video stream contents, and + * the video stream must contain closed GOPs for the output file parts + * to be played individually correctly. + * + * <refsect2> + * <title>Example pipelines</title> + * |[ + * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc max-key-int=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000 + * ]| + * + * Records a video stream captured from a v4l2 device and muxes it into + * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds + * and 1MB maximum size. + * + * </refsect2> + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> +#include "gstsplitmuxsink.h" + +GST_DEBUG_CATEGORY_STATIC (splitmux_debug); +#define GST_CAT_DEFAULT splitmux_debug + +#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock) +#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock) +#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock) +#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond) + +enum +{ + PROP_0, + PROP_LOCATION, + PROP_MAX_SIZE_TIME, + PROP_MAX_SIZE_BYTES, + PROP_MUXER_OVERHEAD, + PROP_MUXER, + PROP_SINK +}; + +#define DEFAULT_MAX_SIZE_TIME 0 +#define DEFAULT_MAX_SIZE_BYTES 0 +#define DEFAULT_MUXER_OVERHEAD 0.02 +#define DEFAULT_MUXER "mp4mux" +#define DEFAULT_SINK "filesink" + +static GstStaticPadTemplate video_sink_template = +GST_STATIC_PAD_TEMPLATE ("video", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); +static GstStaticPadTemplate audio_sink_template = +GST_STATIC_PAD_TEMPLATE ("audio_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); +static GstStaticPadTemplate subtitle_sink_template = +GST_STATIC_PAD_TEMPLATE ("subtitle_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); + +static GQuark PAD_CONTEXT; + +static void +_do_init (void) +{ + PAD_CONTEXT = g_quark_from_static_string ("pad-context"); +} + +#define gst_splitmux_sink_parent_class parent_class +G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0, + _do_init ()); + +static gboolean create_elements (GstSplitMuxSink * splitmux); +static gboolean create_sink (GstSplitMuxSink * splitmux); +static void gst_splitmux_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_splitmux_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_splitmux_sink_dispose (GObject * object); +static void gst_splitmux_sink_finalize (GObject * object); + +static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name, const GstCaps * caps); +static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad); + +static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement * + element, GstStateChange transition); + +static void bus_handler (GstBin * bin, GstMessage * msg); +static void set_next_filename (GstSplitMuxSink * splitmux); +static void start_next_fragment (GstSplitMuxSink * splitmux); +static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); + +static MqStreamBuf * +mq_stream_buf_new (void) +{ + return g_slice_new0 (MqStreamBuf); +} + +static void +mq_stream_buf_free (MqStreamBuf * data) +{ + g_slice_free (MqStreamBuf, data); +} + +static void +gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + GstBinClass *gstbin_class = (GstBinClass *) klass; + + gobject_class->set_property = gst_splitmux_sink_set_property; + gobject_class->get_property = gst_splitmux_sink_get_property; + gobject_class->dispose = gst_splitmux_sink_dispose; + gobject_class->finalize = gst_splitmux_sink_finalize; + + gst_element_class_set_static_metadata (gstelement_class, + "Split Muxing Bin", "Generic/Bin/Muxer", + "Convenience bin that muxes incoming streams into multiple time/size limited files", + "Jan Schmidt <jan@centricular.com>"); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&video_sink_template)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&audio_sink_template)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&subtitle_sink_template)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad); + + gstbin_class->handle_message = bus_handler; + + g_object_class_install_property (gobject_class, PROP_LOCATION, + g_param_spec_string ("location", "File Output Pattern", + "Format string pattern for the location of the files to write (e.g. video%05d.mp4)", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD, + g_param_spec_double ("mux-overhead", "Muxing Overhead", + "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0, + DEFAULT_MUXER_OVERHEAD, + G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64, + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, + g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", + "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, + DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MUXER, + g_param_spec_object ("muxer", "Muxer", + "The muxer element to use (NULL = default mp4mux)", + GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SINK, + g_param_spec_object ("sink", "Sink", + "The sink element (or element chain) to use (NULL = default filesink)", + GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static void +gst_splitmux_sink_init (GstSplitMuxSink * splitmux) +{ + g_mutex_init (&splitmux->lock); + g_cond_init (&splitmux->data_cond); + + splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; + splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; + splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; +} + +static void +gst_splitmux_reset (GstSplitMuxSink * splitmux) +{ + if (splitmux->mq) + gst_bin_remove (GST_BIN (splitmux), splitmux->mq); + if (splitmux->muxer) + gst_bin_remove (GST_BIN (splitmux), splitmux->muxer); + if (splitmux->active_sink) + gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink); + + splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = + NULL; +} + +static void +gst_splitmux_sink_dispose (GObject * object) +{ + GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); + + G_OBJECT_CLASS (parent_class)->dispose (object); + + /* Calling parent dispose invalidates all child pointers */ + splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq = + NULL; +} + +static void +gst_splitmux_sink_finalize (GObject * object) +{ + GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); + g_cond_clear (&splitmux->data_cond); + if (splitmux->provided_sink) + gst_object_unref (splitmux->provided_sink); + + g_free (splitmux->location); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_splitmux_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); + + switch (prop_id) { + case PROP_LOCATION:{ + GST_OBJECT_LOCK (splitmux); + g_free (splitmux->location); + splitmux->location = g_value_dup_string (value); + GST_OBJECT_UNLOCK (splitmux); + break; + } + case PROP_MAX_SIZE_BYTES: + GST_OBJECT_LOCK (splitmux); + splitmux->threshold_bytes = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (splitmux); + splitmux->threshold_time = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_OVERHEAD: + GST_OBJECT_LOCK (splitmux); + splitmux->mux_overhead = g_value_get_double (value); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK: + GST_SPLITMUX_LOCK (splitmux); + if (splitmux->provided_sink) + gst_object_unref (splitmux->provided_sink); + splitmux->provided_sink = g_value_dup_object (value); + GST_SPLITMUX_UNLOCK (splitmux); + break; + case PROP_MUXER: + GST_SPLITMUX_LOCK (splitmux); + if (splitmux->provided_muxer) + gst_object_unref (splitmux->provided_muxer); + splitmux->provided_muxer = g_value_dup_object (value); + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_splitmux_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); + + switch (prop_id) { + case PROP_LOCATION: + GST_OBJECT_LOCK (splitmux); + g_value_set_string (value, splitmux->location); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MAX_SIZE_BYTES: + GST_OBJECT_LOCK (splitmux); + g_value_set_uint64 (value, splitmux->threshold_bytes); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (splitmux); + g_value_set_uint64 (value, splitmux->threshold_time); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_MUXER_OVERHEAD: + GST_OBJECT_LOCK (splitmux); + g_value_set_double (value, splitmux->mux_overhead); + GST_OBJECT_UNLOCK (splitmux); + break; + case PROP_SINK: + GST_SPLITMUX_LOCK (splitmux); + g_value_set_object (value, splitmux->provided_sink); + GST_SPLITMUX_UNLOCK (splitmux); + break; + case PROP_MUXER: + GST_SPLITMUX_LOCK (splitmux); + g_value_set_object (value, splitmux->provided_muxer); + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstPad * +mq_sink_to_src (GstElement * mq, GstPad * sink_pad) +{ + gchar *tmp, *sinkname, *srcname; + GstPad *mq_src; + + sinkname = gst_pad_get_name (sink_pad); + tmp = sinkname + 5; + srcname = g_strdup_printf ("src_%s", tmp); + + mq_src = gst_element_get_static_pad (mq, srcname); + + g_free (sinkname); + g_free (srcname); + + return mq_src; +} + +static gboolean +get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad, + GstPad ** src_pad) +{ + GstPad *mq_sink; + GstPad *mq_src; + + /* Request a pad from multiqueue, then connect this one, then + * discover the corresponding output pad and return both */ + mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u"); + if (mq_sink == NULL) + return FALSE; + + mq_src = mq_sink_to_src (splitmux->mq, mq_sink); + if (mq_src == NULL) + goto fail; + + *sink_pad = mq_sink; + *src_pad = mq_src; + + return TRUE; + +fail: + gst_element_release_request_pad (splitmux->mq, mq_sink); + return FALSE; +} + +static MqStreamCtx * +mq_stream_ctx_new (GstSplitMuxSink * splitmux) +{ + MqStreamCtx *ctx; + + ctx = g_new0 (MqStreamCtx, 1); + g_atomic_int_set (&ctx->refcount, 1); + ctx->splitmux = splitmux; + gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); + ctx->in_running_time = ctx->out_running_time = 0; + g_queue_init (&ctx->queued_bufs); + return ctx; +} + +static void +mq_stream_ctx_free (MqStreamCtx * ctx) +{ + g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); + g_queue_clear (&ctx->queued_bufs); + g_free (ctx); +} + +static void +mq_stream_ctx_unref (MqStreamCtx * ctx) +{ + if (g_atomic_int_dec_and_test (&ctx->refcount)) + mq_stream_ctx_free (ctx); +} + +static void +mq_stream_ctx_ref (MqStreamCtx * ctx) +{ + g_atomic_int_inc (&ctx->refcount); +} + +static void +_pad_block_destroy_sink_notify (MqStreamCtx * ctx) +{ + ctx->sink_pad_block_id = 0; + mq_stream_ctx_unref (ctx); +} + +static void +_pad_block_destroy_src_notify (MqStreamCtx * ctx) +{ + ctx->src_pad_block_id = 0; + mq_stream_ctx_unref (ctx); +} + +/* Called with lock held, drops the lock to send EOS to the + * pad + */ +static void +send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) +{ + GstEvent *eos; + GstPad *pad; + + eos = gst_event_new_eos (); + pad = gst_pad_get_peer (ctx->srcpad); + + ctx->out_eos = TRUE; + + GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad); + GST_SPLITMUX_UNLOCK (splitmux); + gst_pad_send_event (pad, eos); + GST_SPLITMUX_LOCK (splitmux); + + gst_object_unref (pad); +} + +/* Called with splitmux lock held to check if this output + * context needs to sleep to wait for the release of the + * next GOP, or to send EOS to close out the current file + */ +static void +complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) +{ + do { + + GST_LOG_OBJECT (ctx->srcpad, + "Checking running time %" GST_TIME_FORMAT " against max %" + GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time), + GST_TIME_ARGS (splitmux->max_out_running_time)); + + if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE || + ctx->out_running_time < splitmux->max_out_running_time) + return; + + if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED) + return; + + if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) { + if (ctx->out_eos == FALSE) { + send_eos (splitmux, ctx); + continue; + } + } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) { + start_next_fragment (splitmux); + continue; + } + + GST_INFO_OBJECT (ctx->srcpad, + "Sleeping for running time %" + GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")", + GST_TIME_ARGS (ctx->out_running_time), + GST_TIME_ARGS (splitmux->max_out_running_time)); + ctx->out_blocked = TRUE; + /* Expand the mq if needed before sleeping */ + check_queue_length (splitmux, ctx); + GST_SPLITMUX_WAIT (splitmux); + ctx->out_blocked = FALSE; + GST_INFO_OBJECT (ctx->srcpad, + "Woken for new max running time %" GST_TIME_FORMAT, + GST_TIME_ARGS (splitmux->max_out_running_time)); + } while (1); +} + +static GstPadProbeReturn +handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) +{ + GstSplitMuxSink *splitmux = ctx->splitmux; + MqStreamBuf *buf_info = NULL; + + GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type); + + /* FIXME: Handle buffer lists, until then make it clear they won't work */ + if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { + g_warning ("Buffer list handling not implemented"); + return GST_PAD_PROBE_DROP; + } + if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { + GstEvent *event = gst_pad_probe_info_get_event (info); + + GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + gst_event_copy_segment (event, &ctx->out_segment); + break; + case GST_EVENT_FLUSH_STOP: + GST_SPLITMUX_LOCK (splitmux); + gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); + g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); + g_queue_clear (&ctx->queued_bufs); + ctx->flushing = FALSE; + GST_SPLITMUX_UNLOCK (splitmux); + break; + case GST_EVENT_FLUSH_START: + GST_SPLITMUX_LOCK (splitmux); + GST_LOG_OBJECT (pad, "Flush start"); + ctx->flushing = TRUE; + GST_SPLITMUX_BROADCAST (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); + break; + case GST_EVENT_EOS: + GST_SPLITMUX_LOCK (splitmux); + if (splitmux->state == SPLITMUX_STATE_STOPPED) + goto beach; + ctx->out_eos = TRUE; + GST_SPLITMUX_UNLOCK (splitmux); + break; + case GST_EVENT_GAP:{ + GstClockTime gap_ts; + + gst_event_parse_gap (event, &gap_ts, NULL); + if (gap_ts == GST_CLOCK_TIME_NONE) + break; + + GST_SPLITMUX_LOCK (splitmux); + + gap_ts = gst_segment_to_running_time (&ctx->out_segment, + GST_FORMAT_TIME, gap_ts); + + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT, + GST_TIME_ARGS (gap_ts)); + + if (splitmux->state == SPLITMUX_STATE_STOPPED) + goto beach; + ctx->out_running_time = gap_ts; + complete_or_wait_on_out (splitmux, ctx); + GST_SPLITMUX_UNLOCK (splitmux); + break; + } + default: + break; + } + return GST_PAD_PROBE_PASS; + } + + /* Allow everything through until the configured next stopping point */ + GST_SPLITMUX_LOCK (splitmux); + + buf_info = g_queue_pop_tail (&ctx->queued_bufs); + if (buf_info == NULL) + /* Can only happen due to a poorly timed flush */ + goto beach; + + /* If we have popped a keyframe, decrement the queued_gop count */ + if (buf_info->keyframe && splitmux->queued_gops > 0) + splitmux->queued_gops--; + + ctx->out_running_time = buf_info->run_ts; + + GST_LOG_OBJECT (splitmux, + "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT + " size %" G_GSIZE_FORMAT, + pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size); + + complete_or_wait_on_out (splitmux, ctx); + + if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE || + splitmux->muxed_out_time < buf_info->run_ts) + splitmux->muxed_out_time = buf_info->run_ts; + + splitmux->muxed_out_bytes += buf_info->buf_size; + +#ifndef GST_DISABLE_GST_DEBUG + { + GstBuffer *buf = gst_pad_probe_info_get_buffer (info); + GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT + " run ts %" GST_TIME_FORMAT, buf, + GST_TIME_ARGS (ctx->out_running_time)); + } +#endif + + GST_SPLITMUX_UNLOCK (splitmux); + + mq_stream_buf_free (buf_info); + + return GST_PAD_PROBE_PASS; + +beach: + GST_SPLITMUX_UNLOCK (splitmux); + if (buf_info) + mq_stream_buf_free (buf_info); + return GST_PAD_PROBE_DROP; +} + +static gboolean +resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer) +{ + return gst_pad_send_event (peer, gst_event_ref (*event)); +} + +static void +restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) +{ + GstPad *peer = gst_pad_get_peer (ctx->srcpad); + + gst_pad_sticky_events_foreach (ctx->srcpad, + (GstPadStickyEventsForeachFunction) (resend_sticky), peer); + + /* Clear EOS flag */ + ctx->out_eos = FALSE; +} + +/* Called with lock held when a fragment + * reaches EOS and it is time to restart + * a new fragment + */ +static void +start_next_fragment (GstSplitMuxSink * splitmux) +{ + /* 1 change to new file */ + gst_element_set_state (splitmux->muxer, GST_STATE_NULL); + gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); + + set_next_filename (splitmux); + + gst_element_sync_state_with_parent (splitmux->active_sink); + gst_element_sync_state_with_parent (splitmux->muxer); + + g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); + + /* Switch state and go back to processing */ + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + if (!splitmux->video_ctx->in_eos) + splitmux->max_out_running_time = splitmux->video_ctx->in_running_time; + else + splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + + /* Store the overflow parameters as the basis for the next fragment */ + splitmux->mux_start_time = splitmux->muxed_out_time; + splitmux->mux_start_bytes = splitmux->muxed_out_bytes; + + GST_DEBUG_OBJECT (splitmux, + "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT, + GST_TIME_ARGS (splitmux->max_out_running_time)); + + GST_SPLITMUX_BROADCAST (splitmux); +} + +static void +bus_handler (GstBin * bin, GstMessage * message) +{ + GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_EOS: + /* If the state is draining out the current file, drop this EOS */ + GST_SPLITMUX_LOCK (splitmux); + if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && + splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); + splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; + GST_SPLITMUX_BROADCAST (splitmux); + + gst_message_unref (message); + GST_SPLITMUX_UNLOCK (splitmux); + return; + } + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + break; + } + + GST_BIN_CLASS (parent_class)->handle_message (bin, message); +} + +/* Called with splitmux lock held */ +/* Called when entering ProcessingCompleteGop state + * Assess if mq contents overflowed the current file + * -> If yes, need to switch to new file + * -> if no, set max_out_running_time to let this GOP in and + * go to COLLECTING_GOP_START state + */ +static void +handle_gathered_gop (GstSplitMuxSink * splitmux) +{ + GList *cur; + gsize queued_bytes = 0; + GstClockTime queued_time = 0; + + /* Assess if the multiqueue contents overflowed the current file */ + for (cur = g_list_first (splitmux->contexts); + cur != NULL; cur = g_list_next (cur)) { + MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); + if (tmpctx->in_running_time > queued_time) + queued_time = tmpctx->in_running_time; + queued_bytes += tmpctx->in_bytes; + } + + g_assert (queued_bytes >= splitmux->mux_start_bytes); + g_assert (queued_time >= splitmux->mux_start_time); + + queued_bytes -= splitmux->mux_start_bytes; + queued_time -= splitmux->mux_start_time; + + /* Expand queued bytes estimate by muxer overhead */ + queued_bytes += (queued_bytes * splitmux->mux_overhead); + + GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT + " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes); + + /* Check for overrun - have we output at least one byte and overrun + * either threshold? */ + if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) && + ((splitmux->threshold_bytes > 0 && + queued_bytes >= splitmux->threshold_bytes) || + (splitmux->threshold_time > 0 && + queued_time >= splitmux->threshold_time))) { + + splitmux->state = SPLITMUX_STATE_ENDING_FILE; + + GST_INFO_OBJECT (splitmux, + "mq overflowed since last, draining out. max out TS is %" + GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_SPLITMUX_BROADCAST (splitmux); + + } else { + /* No overflow */ + GST_LOG_OBJECT (splitmux, + "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT + " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.", + splitmux->muxed_out_bytes - splitmux->mux_start_bytes, + queued_bytes, GST_TIME_ARGS (queued_time)); + + /* Wake everyone up to push this one GOP, then sleep */ + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + if (!splitmux->video_ctx->in_eos) + splitmux->max_out_running_time = splitmux->video_ctx->in_running_time; + else + splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + + GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" + GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_SPLITMUX_BROADCAST (splitmux); + } + +} + +/* Called with splitmux lock held */ +/* Called from each input pad when it is has all the pieces + * for a GOP or EOS, starting with the video pad which has set the + * splitmux->max_in_running_time + */ +static void +check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) +{ + GList *cur; + gboolean ready = TRUE; + + if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { + /* Iterate each pad, and check that the input running time is at least + * up to the video runnning time, and if so handle the collected GOP */ + GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx); + for (cur = g_list_first (splitmux->contexts); + cur != NULL; cur = g_list_next (cur)) { + MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); + + GST_LOG_OBJECT (splitmux, + "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT + " EOS %d", tmpctx, tmpctx->srcpad, + GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); + + if (tmpctx->in_running_time < splitmux->max_in_running_time && + !tmpctx->in_eos) { + GST_LOG_OBJECT (splitmux, + "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep", + tmpctx, tmpctx->srcpad); + ready = FALSE; + break; + } + } + if (ready) { + GST_DEBUG_OBJECT (splitmux, + "Collected GOP is complete. Processing (ctx %p)", ctx); + /* All pads have a complete GOP, release it into the multiqueue */ + handle_gathered_gop (splitmux); + } + } + + /* Some pad is not yet ready, or GOP is being pushed + * either way, sleep and wait to get woken */ + while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE || + splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) && + !ctx->flushing) { + + GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)", + splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ? + "GOP complete" : "EOF draining", ctx); + GST_SPLITMUX_WAIT (splitmux); + + GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx); + } +} + +static void +check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) +{ + GList *cur; + guint cur_len = g_queue_get_length (&ctx->queued_bufs); + + GST_DEBUG_OBJECT (ctx->sinkpad, + "Checking queue length len %u cur_max %u queued gops %u", + cur_len, splitmux->mq_max_buffers, splitmux->queued_gops); + + if (cur_len >= splitmux->mq_max_buffers) { + gboolean allow_grow = FALSE; + + /* If collecting a GOP and this pad might block, + * and there isn't already a pending GOP in the queue + * then grow + */ + if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE && + ctx->in_running_time < splitmux->max_in_running_time && + splitmux->queued_gops <= 1) { + allow_grow = TRUE; + } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START && + ctx->is_video) { + allow_grow = TRUE; + } + + if (!allow_grow) { + for (cur = g_list_first (splitmux->contexts); + cur != NULL; cur = g_list_next (cur)) { + MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); + GST_DEBUG_OBJECT (tmpctx->sinkpad, + " len %u out_blocked %d", + g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked); + /* If another stream is starving, grow */ + if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) { + allow_grow = TRUE; + } + } + } + + if (allow_grow) { + splitmux->mq_max_buffers = cur_len + 1; + + GST_INFO_OBJECT (splitmux, + "Multiqueue overrun - enlarging to %u buffers ctx %p", + splitmux->mq_max_buffers, ctx); + + g_object_set (splitmux->mq, "max-size-buffers", + splitmux->mq_max_buffers, NULL); + } + } +} + +static GstPadProbeReturn +handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) +{ + GstSplitMuxSink *splitmux = ctx->splitmux; + GstBuffer *buf; + MqStreamBuf *buf_info; + GstClockTime ts; + gboolean loop_again; + gboolean keyframe = FALSE; + + GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type); + + /* FIXME: Handle buffer lists, until then make it clear they won't work */ + if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { + g_warning ("Buffer list handling not implemented"); + return GST_PAD_PROBE_DROP; + } + if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) { + GstEvent *event = gst_pad_probe_info_get_event (info); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + gst_event_copy_segment (event, &ctx->in_segment); + break; + case GST_EVENT_FLUSH_STOP: + GST_SPLITMUX_LOCK (splitmux); + gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); + ctx->in_eos = FALSE; + ctx->in_bytes = 0; + ctx->in_running_time = 0; + GST_SPLITMUX_UNLOCK (splitmux); + break; + case GST_EVENT_EOS: + GST_SPLITMUX_LOCK (splitmux); + ctx->in_eos = TRUE; + + if (splitmux->state == SPLITMUX_STATE_STOPPED) + goto beach; + + if (ctx->is_video) { + GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up"); + /* Act as if this is a new keyframe with infinite timestamp */ + splitmux->max_in_running_time = GST_CLOCK_TIME_NONE; + splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + /* Wake up other input pads to collect this GOP */ + GST_SPLITMUX_BROADCAST (splitmux); + check_completed_gop (splitmux, ctx); + } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { + /* If we are waiting for a GOP to be completed (ie, for aux + * pads to catch up), then this pad is complete, so check + * if the whole GOP is. + */ + check_completed_gop (splitmux, ctx); + } + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + break; + } + return GST_PAD_PROBE_PASS; + } + + buf = gst_pad_probe_info_get_buffer (info); + ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment, + GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf)); + buf_info = mq_stream_buf_new (); + + if (GST_BUFFER_PTS_IS_VALID (buf)) + ts = GST_BUFFER_PTS (buf); + else + ts = GST_BUFFER_DTS (buf); + + GST_SPLITMUX_LOCK (splitmux); + + if (splitmux->state == SPLITMUX_STATE_STOPPED) + goto beach; + + /* If this buffer has a timestamp, advance the input timestamp of the + * stream */ + if (GST_CLOCK_TIME_IS_VALID (ts)) { + GstClockTime running_time = + gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, + GST_BUFFER_TIMESTAMP (buf)); + + if (GST_CLOCK_TIME_IS_VALID (running_time) && + (ctx->in_running_time == GST_CLOCK_TIME_NONE + || running_time > ctx->in_running_time)) + ctx->in_running_time = running_time; + } + + /* Try to make sure we have a valid running time */ + if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) { + ctx->in_running_time = + gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, + ctx->in_segment.start); + } + + buf_info->run_ts = ctx->in_running_time; + buf_info->buf_size = gst_buffer_get_size (buf); + + /* Update total input byte counter for overflow detect */ + ctx->in_bytes += buf_info->buf_size; + + GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT + " total in_bytes %" G_GSIZE_FORMAT, + GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes); + + loop_again = TRUE; + do { + if (ctx->flushing) + break; + + switch (splitmux->state) { + case SPLITMUX_STATE_COLLECTING_GOP_START: + if (ctx->is_video) { + /* If a keyframe, we have a complete GOP */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || + !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) || + splitmux->max_in_running_time >= ctx->in_running_time) { + /* Pass this buffer through */ + loop_again = FALSE; + break; + } + GST_INFO_OBJECT (pad, + "Have keyframe with running time %" GST_TIME_FORMAT, + GST_TIME_ARGS (ctx->in_running_time)); + keyframe = TRUE; + splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + splitmux->max_in_running_time = ctx->in_running_time; + /* Wake up other input pads to collect this GOP */ + GST_SPLITMUX_BROADCAST (splitmux); + check_completed_gop (splitmux, ctx); + } else { + /* We're still waiting for a keyframe on the video pad, sleep */ + GST_LOG_OBJECT (pad, "Sleeping for GOP start"); + GST_SPLITMUX_WAIT (splitmux); + GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d", + splitmux->state); + } + break; + case SPLITMUX_STATE_WAITING_GOP_COMPLETE: + /* After a GOP start is found, this buffer might complete the GOP */ + /* If we overran the target timestamp, it might be time to process + * the GOP, otherwise bail out for more data + */ + GST_LOG_OBJECT (pad, + "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT, + GST_TIME_ARGS (ctx->in_running_time), + GST_TIME_ARGS (splitmux->max_in_running_time)); + + if (ctx->in_running_time < splitmux->max_in_running_time) { + loop_again = FALSE; + break; + } + + GST_LOG_OBJECT (pad, + "Collected last packet of GOP. Checking other pads"); + check_completed_gop (splitmux, ctx); + break; + case SPLITMUX_STATE_ENDING_FILE: + case SPLITMUX_STATE_START_NEXT_FRAGMENT: + /* A fragment is ending, wait until that's done before continuing */ + GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart"); + GST_SPLITMUX_WAIT (splitmux); + GST_DEBUG_OBJECT (pad, + "Done sleeping for fragment restart state now %d", splitmux->state); + break; + default: + loop_again = FALSE; + break; + } + } while (loop_again); + + if (keyframe) { + splitmux->queued_gops++; + buf_info->keyframe = TRUE; + } + + /* Now add this buffer to the queue just before returning */ + g_queue_push_head (&ctx->queued_bufs, buf_info); + + /* Check the buffer will fit in the mq */ + check_queue_length (splitmux, ctx); + + GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT + " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time)); + +beach: + GST_SPLITMUX_UNLOCK (splitmux); + + return GST_PAD_PROBE_PASS; +} + +static GstPad * +gst_splitmux_sink_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name, const GstCaps * caps) +{ + GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; + GstPadTemplate *mux_template = NULL; + GstPad *res = NULL; + GstPad *mq_sink, *mq_src; + gchar *gname; + gboolean is_video = FALSE; + MqStreamCtx *ctx; + + GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name); + + GST_SPLITMUX_LOCK (splitmux); + if (!create_elements (splitmux)) + goto fail; + + if (templ->name_template) { + if (g_str_equal (templ->name_template, "video")) { + /* FIXME: Look for a pad template with matching caps, rather than by name */ + mux_template = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS + (splitmux->muxer), "video_%u"); + is_video = TRUE; + name = NULL; + } else { + mux_template = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS + (splitmux->muxer), templ->name_template); + } + } + + res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); + if (res == NULL) + goto fail; + + if (is_video) + gname = g_strdup ("video"); + else if (name == NULL) + gname = gst_pad_get_name (res); + else + gname = g_strdup (name); + + if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) { + gst_element_release_request_pad (splitmux->muxer, res); + goto fail; + } + + if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) { + gst_element_release_request_pad (splitmux->muxer, res); + gst_element_release_request_pad (splitmux->mq, mq_sink); + goto fail; + } + + ctx = mq_stream_ctx_new (splitmux); + ctx->is_video = is_video; + ctx->srcpad = mq_src; + ctx->sinkpad = mq_sink; + + mq_stream_ctx_ref (ctx); + ctx->src_pad_block_id = + gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, + (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) + _pad_block_destroy_src_notify); + if (is_video) + splitmux->video_ctx = ctx; + + res = gst_ghost_pad_new (gname, mq_sink); + g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); + + mq_stream_ctx_ref (ctx); + ctx->sink_pad_block_id = + gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, + (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) + _pad_block_destroy_sink_notify); + + GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT + " is mq pad %" GST_PTR_FORMAT, res, mq_sink); + + splitmux->contexts = g_list_prepend (splitmux->contexts, ctx); + + g_free (gname); + + gst_object_unref (mq_sink); + gst_object_unref (mq_src); + + gst_pad_set_active (res, TRUE); + gst_element_add_pad (element, res); + GST_SPLITMUX_UNLOCK (splitmux); + + return res; +fail: + GST_SPLITMUX_UNLOCK (splitmux); + return NULL; +} + +static void +gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) +{ + GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; + GstPad *mqsink, *mqsrc, *muxpad; + MqStreamCtx *ctx = + (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT)); + + GST_SPLITMUX_LOCK (splitmux); + + if (splitmux->muxer == NULL || splitmux->mq == NULL) + goto fail; /* Elements don't exist yet - nothing to release */ + + GST_INFO_OBJECT (pad, "releasing request pad"); + + mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad)); + mqsrc = mq_sink_to_src (splitmux->mq, mqsink); + muxpad = gst_pad_get_peer (mqsrc); + + /* Remove the context from our consideration */ + splitmux->contexts = g_list_remove (splitmux->contexts, ctx); + + if (ctx->sink_pad_block_id) + gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id); + + if (ctx->src_pad_block_id) + gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id); + + /* Can release the context now */ + mq_stream_ctx_unref (ctx); + + /* Release and free the mq input */ + gst_element_release_request_pad (splitmux->mq, mqsink); + + /* Release and free the muxer input */ + gst_element_release_request_pad (splitmux->muxer, muxpad); + + gst_object_unref (mqsink); + gst_object_unref (mqsrc); + gst_object_unref (muxpad); + + gst_element_remove_pad (element, pad); + +fail: + GST_SPLITMUX_UNLOCK (splitmux); +} + +static GstElement * +create_element (GstSplitMuxSink * splitmux, + const gchar * factory, const gchar * name) +{ + GstElement *ret = gst_element_factory_make (factory, name); + if (ret == NULL) { + g_warning ("Failed to create %s - splitmuxsink will not work", name); + return NULL; + } + + if (!gst_bin_add (GST_BIN (splitmux), ret)) { + g_warning ("Could not add %s element - splitmuxsink will not work", name); + gst_object_unref (ret); + return NULL; + } + + return ret; +} + +static gboolean +create_elements (GstSplitMuxSink * splitmux) +{ + /* Create internal elements */ + if (splitmux->mq == NULL) { + if ((splitmux->mq = + create_element (splitmux, "multiqueue", "multiqueue")) == NULL) + goto fail; + + splitmux->mq_max_buffers = 5; + /* No bytes or time limit, we limit buffers manually */ + g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time", + (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL); + } + + if (splitmux->muxer == NULL) { + if (splitmux->provided_muxer == NULL) { + if ((splitmux->muxer = + create_element (splitmux, "mp4mux", "muxer")) == NULL) + goto fail; + } else { + splitmux->muxer = splitmux->provided_muxer; + if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) { + g_warning ("Could not add muxer element - splitmuxsink will not work"); + goto fail; + } + } + } + + return TRUE; +fail: + return FALSE; +} + +static GstElement * +find_sink (GstElement * e) +{ + GstElement *res = NULL; + GstIterator *iter; + gboolean done = FALSE; + GValue data = { 0, }; + + if (!GST_IS_BIN (e)) + return e; + + iter = gst_bin_iterate_sinks (GST_BIN (e)); + while (!done) { + switch (gst_iterator_next (iter, &data)) { + case GST_ITERATOR_OK: + { + GstElement *child = g_value_get_object (&data); + if (g_object_class_find_property (G_OBJECT_GET_CLASS (child), + "location") != NULL) { + res = child; + done = TRUE; + } + g_value_reset (&data); + break; + } + case GST_ITERATOR_RESYNC: + gst_iterator_resync (iter); + break; + case GST_ITERATOR_DONE: + done = TRUE; + break; + case GST_ITERATOR_ERROR: + g_assert_not_reached (); + break; + } + } + g_value_unset (&data); + gst_iterator_free (iter); + + return res; +} + +static gboolean +create_sink (GstSplitMuxSink * splitmux) +{ + g_return_val_if_fail (splitmux->active_sink == NULL, TRUE); + + if (splitmux->provided_sink == NULL) { + if ((splitmux->sink = + create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) + goto fail; + splitmux->active_sink = splitmux->sink; + } else { + if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) { + g_warning ("Could not add sink elements - splitmuxsink will not work"); + goto fail; + } + + splitmux->active_sink = splitmux->provided_sink; + + /* Find the sink element */ + splitmux->sink = find_sink (splitmux->active_sink); + if (splitmux->sink == NULL) { + g_warning + ("Could not locate sink element in provided sink - splitmuxsink will not work"); + goto fail; + } + } + + if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { + g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); + goto fail; + } + + return TRUE; +fail: + return FALSE; +} + +#ifdef __GNUC__ +#pragma GCC diagnostic ignored "-Wformat-nonliteral" +#endif +static void +set_next_filename (GstSplitMuxSink * splitmux) +{ + if (splitmux->location) { + gchar *fname; + + fname = g_strdup_printf (splitmux->location, splitmux->fragment_id); + GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); + g_object_set (splitmux->sink, "location", fname, NULL); + g_free (fname); + + splitmux->fragment_id++; + } +} + +static GstStateChangeReturn +gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY:{ + GST_SPLITMUX_LOCK (splitmux); + if (!create_elements (splitmux) || !create_sink (splitmux)) { + ret = GST_STATE_CHANGE_FAILURE; + GST_SPLITMUX_UNLOCK (splitmux); + goto beach; + } + GST_SPLITMUX_UNLOCK (splitmux); + splitmux->fragment_id = 0; + set_next_filename (splitmux); + break; + } + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + GST_SPLITMUX_LOCK (splitmux); + /* Start by collecting one input on each pad */ + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + splitmux->max_in_running_time = 0; + splitmux->muxed_out_time = splitmux->mux_start_time = 0; + splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; + GST_SPLITMUX_UNLOCK (splitmux); + break; + } + case GST_STATE_CHANGE_PAUSED_TO_READY: + case GST_STATE_CHANGE_READY_TO_NULL: + GST_SPLITMUX_LOCK (splitmux); + splitmux->state = SPLITMUX_STATE_STOPPED; + /* Wake up any blocked threads */ + GST_LOG_OBJECT (splitmux, + "State change -> NULL or READY. Waking threads"); + GST_SPLITMUX_BROADCAST (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (ret == GST_STATE_CHANGE_FAILURE) + goto beach; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_NULL: + GST_SPLITMUX_LOCK (splitmux); + splitmux->fragment_id = 0; + gst_splitmux_reset (splitmux); + GST_SPLITMUX_UNLOCK (splitmux); + break; + default: + break; + } + +beach: + + if (transition == GST_STATE_CHANGE_NULL_TO_READY && + ret == GST_STATE_CHANGE_FAILURE) { + /* Cleanup elements on failed transition out of NULL */ + gst_splitmux_reset (splitmux); + } + return ret; +} + +gboolean +register_splitmuxsink (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, + "Split File Muxing Sink"); + + return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, + GST_TYPE_SPLITMUX_SINK); +} diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h new file mode 100644 index 000000000..f38376733 --- /dev/null +++ b/gst/multifile/gstsplitmuxsink.h @@ -0,0 +1,132 @@ +/* GStreamer split muxer bin + * Copyright (C) 2014 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SPLITMUXSINK_H__ +#define __GST_SPLITMUXSINK_H__ + +#include <gst/gst.h> +#include <gst/pbutils/pbutils.h> + +G_BEGIN_DECLS + +#define GST_TYPE_SPLITMUX_SINK (gst_splitmux_sink_get_type()) +#define GST_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSink)) +#define GST_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSinkClass)) +#define GST_IS_SPLITMUX_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_SINK)) +#define GST_IS_SPLITMUX_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_SINK)) + +typedef struct _GstSplitMuxSink GstSplitMuxSink; +typedef struct _GstSplitMuxSinkClass GstSplitMuxSinkClass; + +GType gst_splitmux_sink_get_type(void); +gboolean register_splitmuxsink (GstPlugin * plugin); + +typedef enum _SplitMuxState { + SPLITMUX_STATE_STOPPED, + SPLITMUX_STATE_COLLECTING_GOP_START, + SPLITMUX_STATE_WAITING_GOP_COMPLETE, + SPLITMUX_STATE_ENDING_FILE, + SPLITMUX_STATE_START_NEXT_FRAGMENT, +} SplitMuxState; + +typedef struct _MqStreamBuf +{ + gboolean keyframe; + GstClockTime run_ts; + gsize buf_size; +} MqStreamBuf; + +typedef struct _MqStreamCtx +{ + gint refcount; + + GstSplitMuxSink *splitmux; + + guint sink_pad_block_id; + guint src_pad_block_id; + + gboolean is_video; + + gboolean flushing; + gboolean in_eos; + gboolean out_eos; + + GstSegment in_segment; + GstSegment out_segment; + + GstClockTime in_running_time; + GstClockTime out_running_time; + + gsize in_bytes; + + GQueue queued_bufs; + + GstPad *sinkpad; + GstPad *srcpad; + + gboolean out_blocked; +} MqStreamCtx; + +struct _GstSplitMuxSink { + GstBin parent; + + GMutex lock; + GCond data_cond; + + SplitMuxState state; + gdouble mux_overhead; + + GstClockTime threshold_time; + guint64 threshold_bytes; + + guint mq_max_buffers; + + GstElement *mq; + GstElement *muxer; + GstElement *sink; + + GstElement *provided_muxer; + + GstElement *provided_sink; + GstElement *active_sink; + + gchar *location; + guint fragment_id; + + GList *contexts; + + MqStreamCtx *video_ctx; + guint queued_gops; + GstClockTime max_in_running_time; + GstClockTime max_out_running_time; + + GstClockTime muxed_out_time; + gsize muxed_out_bytes; + + GstClockTime mux_start_time; + gsize mux_start_bytes; +}; + +struct _GstSplitMuxSinkClass { + GstBinClass parent_class; +}; + +G_END_DECLS + +#endif /* __GST_SPLITMUXSINK_H__ */ diff --git a/gst/multifile/gstsplitmuxsrc.c b/gst/multifile/gstsplitmuxsrc.c new file mode 100644 index 000000000..307dd6ad7 --- /dev/null +++ b/gst/multifile/gstsplitmuxsrc.c @@ -0,0 +1,1118 @@ +/* GStreamer Split Demuxer bin that recombines files created by + * the splitmuxsink element. + * + * Copyright (C) <2014> Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-splitmuxsrc + * @short_description: Split Demuxer bin that recombines files created by + * the splitmuxsink element. + * + * This element reads a set of input files created by the splitmuxsink element + * containing contiguous elementary streams split across multiple files. + * + * This element is similar to splitfilesrc, except that it recombines the + * streams in each file part at the demuxed elementary level, rather than + * as a single larger bytestream. + * + * <refsect2> + * <title>Example pipelines</title> + * |[ + * gst-launch-1.0 splitmuxsrc location=video*.mov ! decodebin ! xvimagesink + * ]| Demux each file part and output the video stream as one continuous stream + * |[ + * gst-launch-1.0 playbin uri="splitmux://path/to/foo.mp4.*" + * ]| Play back a set of files created by splitmuxsink + * + * </refsect2> + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> +#include "gstsplitmuxsrc.h" +#include "gstsplitutils.h" + +#include "../../gst-libs/gst/gst-i18n-plugin.h" + +GST_DEBUG_CATEGORY (splitmux_debug); +#define GST_CAT_DEFAULT splitmux_debug + +enum +{ + PROP_0, + PROP_LOCATION +}; + +static GstStaticPadTemplate video_src_template = +GST_STATIC_PAD_TEMPLATE ("video", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate audio_src_template = +GST_STATIC_PAD_TEMPLATE ("audio_%u", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate subtitle_src_template = +GST_STATIC_PAD_TEMPLATE ("subtitle_%u", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +static GstStateChangeReturn gst_splitmux_src_change_state (GstElement * + element, GstStateChange transition); +static void gst_splitmux_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_splitmux_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_splitmux_src_dispose (GObject * object); +static void gst_splitmux_src_finalize (GObject * object); +static gboolean gst_splitmux_src_start (GstSplitMuxSrc * splitmux); +static gboolean gst_splitmux_src_stop (GstSplitMuxSrc * splitmux); +static void splitmux_src_pad_constructed (GObject * pad); +static gboolean splitmux_src_pad_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean splitmux_src_pad_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static void splitmux_src_uri_handler_init (gpointer g_iface, + gpointer iface_data); + + +static GstPad *gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, + GstPad * pad, GstSplitMuxSrc * splitmux); +static void gst_splitmux_part_prepared (GstSplitMuxPartReader * reader, + GstSplitMuxSrc * splitmux); +static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, + SplitMuxSrcPad * pad); +static gboolean gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad, + GstEvent * event); + +#define _do_init \ + G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init); +#define gst_splitmux_src_parent_class parent_class + +G_DEFINE_TYPE_EXTENDED (GstSplitMuxSrc, gst_splitmux_src, GST_TYPE_BIN, 0, + _do_init); + +static GstURIType +splitmux_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +splitmux_src_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { "splitmux", NULL }; + + return protocols; +} + +static gchar * +splitmux_src_uri_get_uri (GstURIHandler * handler) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (handler); + gchar *ret = NULL; + + GST_OBJECT_LOCK (splitmux); + if (splitmux->location) + ret = g_strdup_printf ("splitmux://%s", splitmux->location); + GST_OBJECT_UNLOCK (splitmux); + return ret; +} + +static gboolean +splitmux_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** err) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (handler); + gchar *protocol, *location; + + protocol = gst_uri_get_protocol (uri); + if (protocol == NULL || !g_str_equal (protocol, "splitmux")) + goto wrong_uri; + g_free (protocol); + + location = gst_uri_get_location (uri); + GST_OBJECT_LOCK (splitmux); + g_free (splitmux->location); + splitmux->location = location; + GST_OBJECT_UNLOCK (splitmux); + + return TRUE; + +wrong_uri: + g_free (protocol); + GST_ELEMENT_ERROR (splitmux, RESOURCE, READ, (NULL), + ("Error parsing uri %s", uri)); + g_set_error_literal (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Could not parse splitmux URI"); + return FALSE; +} + +static void +splitmux_src_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) (g_iface); + + iface->get_type = splitmux_src_uri_get_type; + iface->get_protocols = splitmux_src_uri_get_protocols; + iface->set_uri = splitmux_src_uri_set_uri; + iface->get_uri = splitmux_src_uri_get_uri; +} + + +static void +gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + + gobject_class->set_property = gst_splitmux_src_set_property; + gobject_class->get_property = gst_splitmux_src_get_property; + gobject_class->dispose = gst_splitmux_src_dispose; + gobject_class->finalize = gst_splitmux_src_finalize; + + gst_element_class_set_static_metadata (gstelement_class, + "Split File Demuxing Bin", "Generic/Bin/Demuxer", + "Source that reads a set of files created by splitmuxsink", + "Jan Schmidt <jan@centricular.com>"); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&video_src_template)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&audio_src_template)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&subtitle_src_template)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_splitmux_src_change_state); + + g_object_class_install_property (gobject_class, PROP_LOCATION, + g_param_spec_string ("location", "File Input Pattern", + "Glob pattern for the location of the files to read", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static void +gst_splitmux_src_init (GstSplitMuxSrc * splitmux) +{ + g_mutex_init (&splitmux->lock); + g_mutex_init (&splitmux->pads_lock); + splitmux->total_duration = GST_CLOCK_TIME_NONE; + gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME); +} + +static void +gst_splitmux_src_dispose (GObject * object) +{ +} + +static void +gst_splitmux_src_finalize (GObject * object) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (object); + g_mutex_clear (&splitmux->lock); + g_mutex_clear (&splitmux->pads_lock); +} + +static void +gst_splitmux_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (object); + + switch (prop_id) { + case PROP_LOCATION:{ + GST_OBJECT_LOCK (splitmux); + g_free (splitmux->location); + splitmux->location = g_value_dup_string (value); + GST_OBJECT_UNLOCK (splitmux); + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_splitmux_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (object); + + switch (prop_id) { + case PROP_LOCATION: + GST_OBJECT_LOCK (splitmux); + g_value_set_string (value, splitmux->location); + GST_OBJECT_UNLOCK (splitmux); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_splitmux_src_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstSplitMuxSrc *splitmux = (GstSplitMuxSrc *) element; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY:{ + break; + } + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + if (!gst_splitmux_src_start (splitmux)) + return GST_STATE_CHANGE_FAILURE; + break; + } + case GST_STATE_CHANGE_PAUSED_TO_READY: + case GST_STATE_CHANGE_READY_TO_NULL: + if (!gst_splitmux_src_stop (splitmux)) + return GST_STATE_CHANGE_FAILURE; + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (ret == GST_STATE_CHANGE_FAILURE) + goto beach; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + +beach: + return ret; +} + +static GstSplitMuxPartReader * +gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename) +{ + GstSplitMuxPartReader *r; + + r = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL); + + g_signal_connect (r, "prepared", (GCallback) gst_splitmux_part_prepared, + splitmux); + + gst_splitmux_part_reader_set_callbacks (r, splitmux, + (GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad); + gst_splitmux_part_reader_set_location (r, filename); + + return r; +} + +static gboolean +resend_sticky (GstPad * pad, GstEvent ** event, GstPad * target) +{ + switch (GST_EVENT_TYPE (*event)) { + case GST_EVENT_CAPS: + if (!gst_splitmux_check_new_caps (SPLITMUX_SRC_PAD (target), *event)) + return TRUE; + return gst_pad_push_event (target, gst_event_ref (*event)); + case GST_EVENT_STREAM_START: + return gst_pad_push_event (target, gst_event_ref (*event)); + default: + return TRUE; + } + + return TRUE; +} + +static gboolean +gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad, GstEvent * event) +{ + GstCaps *curcaps = gst_pad_get_current_caps ((GstPad *) (splitpad)); + GstCaps *newcaps; + GstCaps *tmpcaps; + GstCaps *tmpcurcaps; + + GstStructure *s; + gboolean res = TRUE; + + gst_event_parse_caps (event, &newcaps); + + GST_LOG_OBJECT (splitpad, "Comparing caps %" GST_PTR_FORMAT + " and %" GST_PTR_FORMAT, curcaps, newcaps); + + if (curcaps == NULL) + return TRUE; + + /* If caps are exactly equal exit early */ + if (gst_caps_is_equal (curcaps, newcaps)) { + gst_caps_unref (curcaps); + return FALSE; + } + + /* More extensive check, ignore changes in framerate, because + * demuxers get that wrong */ + tmpcaps = gst_caps_copy (newcaps); + s = gst_caps_get_structure (tmpcaps, 0); + gst_structure_remove_field (s, "framerate"); + + tmpcurcaps = gst_caps_copy (curcaps); + gst_caps_unref (curcaps); + s = gst_caps_get_structure (tmpcurcaps, 0); + gst_structure_remove_field (s, "framerate"); + + /* Now check if these filtered caps are equal */ + if (gst_caps_is_equal (tmpcurcaps, tmpcaps)) { + GST_INFO_OBJECT (splitpad, "Ignoring framerate-only caps change"); + res = FALSE; + } + + gst_caps_unref (tmpcaps); + gst_caps_unref (tmpcurcaps); + return res; +} + +static void +gst_splitmux_handle_event (GstSplitMuxSrc * splitmux, + SplitMuxSrcPad * splitpad, GstPad * part_pad, GstEvent * event) +{ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_STREAM_START:{ + if (splitpad->sent_stream_start) + goto drop_event; + splitpad->sent_stream_start = TRUE; + break; + } + case GST_EVENT_EOS:{ + if (gst_splitmux_end_of_part (splitmux, splitpad)) + // Continuing to next part, drop the EOS + goto drop_event; + break; + } + case GST_EVENT_SEGMENT:{ + GstSegment seg; + + gst_event_copy_segment (event, &seg); + + splitpad->segment.position = seg.position; + + if (splitpad->sent_segment) + goto drop_event; /* We already forwarded a segment event */ + + /* Calculate output segment */ + GST_LOG_OBJECT (splitpad, "Pad seg %" GST_SEGMENT_FORMAT + " got seg %" GST_SEGMENT_FORMAT + " play seg %" GST_SEGMENT_FORMAT, + &splitpad->segment, &seg, &splitmux->play_segment); + + /* If playing forward, take the stop time from the overall + * seg or play_segment */ + if (splitmux->play_segment.rate > 0.0) { + if (splitmux->play_segment.stop != -1) + seg.stop = splitmux->play_segment.stop; + else + seg.stop = splitpad->segment.stop; + } else { + /* Reverse playback from stop time to start time */ + /* See if an end point was requested in the seek */ + if (splitmux->play_segment.start != -1) { + seg.start = splitmux->play_segment.start; + seg.time = splitmux->play_segment.time; + } else { + seg.start = splitpad->segment.start; + seg.time = splitpad->segment.time; + } + } + + GST_INFO_OBJECT (splitpad, + "Forwarding segment %" GST_SEGMENT_FORMAT, &seg); + + gst_event_unref (event); + event = gst_event_new_segment (&seg); + splitpad->sent_segment = TRUE; + break; + } + case GST_EVENT_CAPS:{ + if (!gst_splitmux_check_new_caps (splitpad, event)) + goto drop_event; + splitpad->sent_caps = TRUE; + break; + } + default: + break; + } + + /* Make sure to send sticky events - from the part_pad directly */ + if (splitpad->sent_caps == FALSE || splitpad->sent_stream_start == FALSE) { + gst_pad_sticky_events_foreach (GST_PAD_CAST (part_pad), + (GstPadStickyEventsForeachFunction) (resend_sticky), splitpad); + splitpad->sent_caps = splitpad->sent_stream_start = TRUE; + } + + gst_pad_push_event ((GstPad *) (splitpad), event); + return; +drop_event: + gst_event_unref (event); + return; +} + +static GstFlowReturn +gst_splitmux_handle_buffer (GstSplitMuxSrc * splitmux, + SplitMuxSrcPad * splitpad, GstBuffer * buf) +{ + GstFlowReturn ret; + + if (splitpad->clear_next_discont) { + GST_LOG_OBJECT (splitpad, "Clearing discont flag on buffer"); + GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT); + splitpad->clear_next_discont = FALSE; + } + if (splitpad->set_next_discont) { + GST_LOG_OBJECT (splitpad, "Setting discont flag on buffer"); + GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); + splitpad->set_next_discont = FALSE; + } + + ret = gst_pad_push (GST_PAD_CAST (splitpad), buf); + + GST_LOG_OBJECT (splitpad, "Pad push returned %d", ret); + return ret; +} + +static void +gst_splitmux_pad_loop (GstPad * pad) +{ + /* Get one event/buffer from the associated part and push */ + SplitMuxSrcPad *splitpad = (SplitMuxSrcPad *) (pad); + GstSplitMuxSrc *splitmux = (GstSplitMuxSrc *) gst_pad_get_parent (pad); + GstDataQueueItem *item = NULL; + GstSplitMuxPartReader *reader = splitpad->reader; + GstPad *part_pad = splitpad->part_pad; + GstFlowReturn ret; + + GST_LOG_OBJECT (splitpad, "Popping data queue item from %" GST_PTR_FORMAT + " pad %" GST_PTR_FORMAT, reader, part_pad); + ret = gst_splitmux_part_reader_pop (reader, part_pad, &item); + if (ret == GST_FLOW_ERROR) + goto error; + if (ret == GST_FLOW_FLUSHING || item == NULL) + goto flushing; + + GST_DEBUG_OBJECT (splitpad, "Got data queue item %" GST_PTR_FORMAT, + item->object); + + if (GST_IS_EVENT (item->object)) { + GstEvent *event = (GstEvent *) (item->object); + gst_splitmux_handle_event (splitmux, splitpad, part_pad, event); + } else { + GstBuffer *buf = (GstBuffer *) (item->object); + GstFlowReturn ret = gst_splitmux_handle_buffer (splitmux, splitpad, buf); + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + /* Stop immediately on error or flushing */ + GST_INFO_OBJECT (splitpad, "Stopping due to pad_push() result %d", ret); + gst_pad_pause_task (pad); + if (ret <= GST_FLOW_EOS) { + const gchar *reason = gst_flow_get_name (ret); + GST_ELEMENT_ERROR (splitmux, STREAM, FAILED, + (_("Internal data flow error.")), + ("streaming task paused, reason %s (%d)", reason, ret)); + } + } + } + g_slice_free (GstDataQueueItem, item); + + gst_object_unref (splitmux); + return; + +error: + /* Fall through */ + GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, + ("Error reading part file %s", GST_STR_NULL (reader->path)), (NULL)); +flushing: + gst_pad_pause_task (pad); + gst_object_unref (splitmux); + return; +} + +static void +gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part) +{ + GList *cur; + + GST_DEBUG_OBJECT (splitmux, "Activating part %d", part); + + splitmux->cur_part = part; + gst_splitmux_part_reader_activate (splitmux->parts[part], + &splitmux->play_segment); + + SPLITMUX_SRC_PADS_LOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + SplitMuxSrcPad *splitpad = (SplitMuxSrcPad *) (cur->data); + splitpad->cur_part = part; + splitpad->reader = splitmux->parts[splitpad->cur_part]; + splitpad->part_pad = + gst_splitmux_part_reader_lookup_pad (splitpad->reader, + (GstPad *) (splitpad)); + + /* Make sure we start with a DISCONT */ + splitpad->set_next_discont = TRUE; + splitpad->clear_next_discont = FALSE; + + gst_pad_start_task (GST_PAD (splitpad), + (GstTaskFunction) gst_splitmux_pad_loop, splitpad, NULL); + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); +} + +static gboolean +gst_splitmux_src_start (GstSplitMuxSrc * splitmux) +{ + gboolean ret = FALSE; + GError *err = NULL; + gchar *basename = NULL; + gchar *dirname = NULL; + gchar **files; + GstClockTime next_offset = 0; + guint i; + + GST_OBJECT_LOCK (splitmux); + if (splitmux->location != NULL && splitmux->location[0] != '\0') { + basename = g_path_get_basename (splitmux->location); + dirname = g_path_get_dirname (splitmux->location); + } + GST_OBJECT_UNLOCK (splitmux); + + files = gst_split_util_find_files (dirname, basename, &err); + + if (files == NULL || *files == NULL) + goto no_files; + + splitmux->num_parts = g_strv_length (files); + + splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts); + + for (i = 0; i < splitmux->num_parts; i++) { + splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]); + if (splitmux->parts[i] == NULL) + break; + + /* Figure out the next offset - the smallest one */ + gst_splitmux_part_reader_set_start_offset (splitmux->parts[i], next_offset); + if (!gst_splitmux_part_reader_prepare (splitmux->parts[i])) { + GST_WARNING_OBJECT (splitmux, + "Failed to prepare file part %s. Cannot play past there.", files[i]); + GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL), + ("Failed to prepare file part %s. Cannot play past there.", + files[i])); + gst_splitmux_part_reader_unprepare (splitmux->parts[i]); + g_object_unref (splitmux->parts[i]); + splitmux->parts[i] = NULL; + break; + } + + /* Extend our total duration to cover this part */ + splitmux->total_duration = + next_offset + + gst_splitmux_part_reader_get_duration (splitmux->parts[i]); + splitmux->play_segment.duration = splitmux->total_duration; + + next_offset = gst_splitmux_part_reader_get_end_offset (splitmux->parts[i]); + } + /* Store how many parts we actually created */ + splitmux->num_parts = i; + + if (splitmux->num_parts < 1) + goto failed_part; + + /* All done preparing, activate the first part */ + GST_INFO_OBJECT (splitmux, + "All parts prepared. Total duration %" GST_TIME_FORMAT + " Activating first part", GST_TIME_ARGS (splitmux->total_duration)); + gst_splitmux_src_activate_part (splitmux, 0); + + SPLITMUX_SRC_LOCK (splitmux); + splitmux->running = TRUE; + SPLITMUX_SRC_UNLOCK (splitmux); + ret = TRUE; +done: + if (err != NULL) + g_error_free (err); + g_strfreev (files); + g_free (basename); + g_free (dirname); + + return ret; + +/* ERRORS */ +no_files: + { + GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, ("%s", err->message), + ("Failed to find files in '%s' for pattern '%s'", + GST_STR_NULL (dirname), GST_STR_NULL (basename))); + goto done; + } +failed_part: + { + GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL), + ("Failed to open any files for reading")); + goto done; + } +} + +static gboolean +gst_splitmux_src_stop (GstSplitMuxSrc * splitmux) +{ + gboolean ret = TRUE; + guint i; + GList *cur; + + SPLITMUX_SRC_LOCK (splitmux); + if (!splitmux->running) + goto out; + + /* Stop and destroy all parts */ + for (i = 0; i < splitmux->num_parts; i++) { + gst_splitmux_part_reader_unprepare (splitmux->parts[i]); + g_object_unref (splitmux->parts[i]); + } + + SPLITMUX_SRC_PADS_LOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + SplitMuxSrcPad *tmp = (SplitMuxSrcPad *) (cur->data); + gst_pad_stop_task (GST_PAD (tmp)); + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + + g_free (splitmux->parts); + splitmux->parts = NULL; + splitmux->num_parts = 0; + splitmux->running = FALSE; + splitmux->total_duration = GST_CLOCK_TIME_NONE; + /* Reset playback segment */ + gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME); +out: + SPLITMUX_SRC_UNLOCK (splitmux); + return ret; +} + +static GstPad * +gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, GstPad * pad, + GstSplitMuxSrc * splitmux) +{ + GList *cur; + gchar *pad_name = gst_pad_get_name (pad); + GstPad *target = NULL; + gboolean is_new_pad = FALSE; + + SPLITMUX_SRC_LOCK (splitmux); + SPLITMUX_SRC_PADS_LOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + GstPad *tmp = (GstPad *) (cur->data); + if (g_str_equal (GST_PAD_NAME (tmp), pad_name)) { + target = tmp; + break; + } + } + + if (target == NULL && !splitmux->pads_complete) { + /* No pad found, create one */ + target = g_object_new (SPLITMUX_TYPE_SRC_PAD, + "name", pad_name, "direction", GST_PAD_SRC, NULL); + splitmux->pads = g_list_prepend (splitmux->pads, target); + + gst_pad_set_active (target, TRUE); + is_new_pad = TRUE; + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + SPLITMUX_SRC_UNLOCK (splitmux); + + g_free (pad_name); + + if (target == NULL) + goto pad_not_found; + + if (is_new_pad) + gst_element_add_pad (GST_ELEMENT_CAST (splitmux), target); + + return target; + +pad_not_found: + GST_ELEMENT_ERROR (splitmux, STREAM, FAILED, (NULL), + ("Stream part %s contains extra unknown pad %" GST_PTR_FORMAT, + part->path, pad)); + return NULL; +} + +static void +gst_splitmux_part_prepared (GstSplitMuxPartReader * reader, + GstSplitMuxSrc * splitmux) +{ + gboolean need_no_more_pads; + + SPLITMUX_SRC_LOCK (splitmux); + need_no_more_pads = !splitmux->pads_complete; + splitmux->pads_complete = TRUE; + SPLITMUX_SRC_UNLOCK (splitmux); + + if (need_no_more_pads) { + GST_DEBUG_OBJECT (splitmux, "Signalling no-more-pads"); + gst_element_no_more_pads (GST_ELEMENT_CAST (splitmux)); + } +} + +static void +gst_splitmux_push_event (GstSplitMuxSrc * splitmux, GstEvent * e, + guint32 seqnum) +{ + GList *cur; + + if (seqnum) + gst_event_set_seqnum (e, seqnum); + + SPLITMUX_SRC_PADS_LOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + GstPad *pad = GST_PAD_CAST (cur->data); + gst_event_ref (e); + gst_pad_push_event (pad, e); + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + + gst_event_unref (e); +} + +static void +gst_splitmux_push_flush_stop (GstSplitMuxSrc * splitmux, guint32 seqnum) +{ + GstEvent *e = gst_event_new_flush_stop (TRUE); + GList *cur; + + if (seqnum) + gst_event_set_seqnum (e, seqnum); + + SPLITMUX_SRC_PADS_LOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + SplitMuxSrcPad *target = (SplitMuxSrcPad *) (cur->data); + + gst_event_ref (e); + gst_pad_push_event (GST_PAD_CAST (target), e); + target->sent_caps = FALSE; + target->sent_stream_start = FALSE; + target->sent_segment = FALSE; + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + + gst_event_unref (e); +} + +/* Callback for when a part finishes and we need to move to the next */ +static gboolean +gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad) +{ + gint next_part = -1; + gint cur_part = cur_part = splitpad->cur_part; + gboolean res = FALSE; + + if (splitmux->play_segment.rate >= 0.0) { + if (cur_part + 1 < splitmux->num_parts) + next_part = cur_part + 1; + /* Make sure the transition is seamless */ + splitpad->set_next_discont = FALSE; + splitpad->clear_next_discont = TRUE; + } else { + /* Reverse play - move to previous segment */ + if (cur_part > 0) { + next_part = cur_part - 1; + /* Non-seamless transition in reverse */ + splitpad->set_next_discont = TRUE; + splitpad->clear_next_discont = FALSE; + } + } + + SPLITMUX_SRC_LOCK (splitmux); + + /* If all pads are done with this part, deactivate it */ + if (gst_splitmux_part_is_eos (splitmux->parts[splitpad->cur_part])) + gst_splitmux_part_reader_deactivate (splitmux->parts[cur_part]); + + if (next_part != -1) { + GST_DEBUG_OBJECT (splitmux, "At EOS on pad %" GST_PTR_FORMAT + " moving to part %d", splitpad, next_part); + splitpad->cur_part = next_part; + splitpad->reader = splitmux->parts[splitpad->cur_part]; + splitpad->part_pad = + gst_splitmux_part_reader_lookup_pad (splitpad->reader, + (GstPad *) (splitpad)); + + if (splitmux->cur_part != next_part) { + GstSegment tmp; + /* If moving backward into a new part, set stop + * to -1 to ensure we play the entire file - workaround + * a bug in qtdemux that misses bits at the end */ + gst_segment_copy_into (&splitmux->play_segment, &tmp); + if (tmp.rate < 0) + tmp.stop = -1; + + /* This is the first pad to move to the new part, activate it */ + splitmux->cur_part = next_part; + GST_DEBUG_OBJECT (splitpad, + "First pad to change part. Activating part %d with seg %" + GST_SEGMENT_FORMAT, next_part, &tmp); + gst_splitmux_part_reader_activate (splitpad->reader, &tmp); + } + res = TRUE; + } + + SPLITMUX_SRC_UNLOCK (splitmux); + return res; +} + +G_DEFINE_TYPE (SplitMuxSrcPad, splitmux_src_pad, GST_TYPE_PAD); + +static void +splitmux_src_pad_constructed (GObject * pad) +{ + gst_pad_set_event_function (GST_PAD (pad), + GST_DEBUG_FUNCPTR (splitmux_src_pad_event)); + gst_pad_set_query_function (GST_PAD (pad), + GST_DEBUG_FUNCPTR (splitmux_src_pad_query)); +} + +static void +splitmux_src_pad_class_init (SplitMuxSrcPadClass * klass) +{ + GObjectClass *gobject_klass = (GObjectClass *) (klass); + + gobject_klass->constructed = splitmux_src_pad_constructed; +} + +static void +splitmux_src_pad_init (SplitMuxSrcPad * pad) +{ +} + +/* Event handler for source pads. Proxy events into the child + * parts as needed + */ +static gboolean +splitmux_src_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (parent); + gboolean ret = FALSE; + + GST_DEBUG_OBJECT (parent, "event %" GST_PTR_FORMAT + " on %" GST_PTR_FORMAT, event, pad); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK:{ + GstFormat format; + gdouble rate; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + guint32 seqnum; + gint i; + GstClockTime part_start, position; + GList *cur; + GstSegment tmp; + + gst_event_parse_seek (event, &rate, &format, &flags, + &start_type, &start, &stop_type, &stop); + + if (format != GST_FORMAT_TIME) { + GST_DEBUG_OBJECT (splitmux, "can only seek on TIME"); + goto error; + } + /* FIXME: Support non-flushing seeks, which might never wake up */ + if (!(flags & GST_SEEK_FLAG_FLUSH)) { + GST_DEBUG_OBJECT (splitmux, "Only flushing seeks supported"); + goto error; + } + seqnum = gst_event_get_seqnum (event); + + SPLITMUX_SRC_LOCK (splitmux); + if (!splitmux->running || splitmux->num_parts < 1) { + /* Not started yet */ + SPLITMUX_SRC_UNLOCK (splitmux); + goto error; + } + + gst_segment_copy_into (&splitmux->play_segment, &tmp); + + if (!gst_segment_do_seek (&tmp, rate, + format, flags, start_type, start, stop_type, stop, NULL)) { + /* Invalid seek requested, ignore it */ + SPLITMUX_SRC_UNLOCK (splitmux); + goto error; + } + position = tmp.position; + + GST_DEBUG_OBJECT (splitmux, "Performing seek with seg %" + GST_SEGMENT_FORMAT, &tmp); + + GST_DEBUG_OBJECT (splitmux, + "Handling flushing seek. Sending flush start"); + + /* Send flush_start */ + gst_splitmux_push_event (splitmux, gst_event_new_flush_start (), seqnum); + + /* Stop all parts, which will work because of the flush */ + SPLITMUX_SRC_PADS_LOCK (splitmux); + SPLITMUX_SRC_UNLOCK (splitmux); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + SplitMuxSrcPad *target = (SplitMuxSrcPad *) (cur->data); + GstSplitMuxPartReader *reader = splitmux->parts[target->cur_part]; + gst_splitmux_part_reader_deactivate (reader); + } + + /* Shut down pad tasks */ + GST_DEBUG_OBJECT (splitmux, "Pausing pad tasks"); + for (cur = g_list_first (splitmux->pads); + cur != NULL; cur = g_list_next (cur)) { + GstPad *splitpad = (GstPad *) (cur->data); + gst_pad_pause_task (GST_PAD (splitpad)); + } + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + SPLITMUX_SRC_LOCK (splitmux); + + /* Send flush stop */ + GST_DEBUG_OBJECT (splitmux, "Sending flush stop"); + gst_splitmux_push_flush_stop (splitmux, seqnum); + + /* Everything is stopped, so update the play_segment */ + gst_segment_copy_into (&tmp, &splitmux->play_segment); + + /* Work out where to start from now */ + for (i = 0; i < splitmux->num_parts; i++) { + GstSplitMuxPartReader *reader = splitmux->parts[i]; + GstClockTime part_end = + gst_splitmux_part_reader_get_end_offset (reader); + + if (part_end > position) + break; + } + if (i == splitmux->num_parts) + i = splitmux->num_parts - 1; + + part_start = + gst_splitmux_part_reader_get_start_offset (splitmux->parts[i]); + + GST_DEBUG_OBJECT (splitmux, + "Seek to time %" GST_TIME_FORMAT " landed in part %d offset %" + GST_TIME_FORMAT, GST_TIME_ARGS (position), + i, GST_TIME_ARGS (position - part_start)); + + gst_splitmux_src_activate_part (splitmux, i); + + ret = TRUE; + SPLITMUX_SRC_UNLOCK (splitmux); + } + default: + break; + } + + gst_event_unref (event); +error: + return ret; +} + +static gboolean +splitmux_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + /* Query handler for source pads. Proxy queries into the child + * parts as needed + */ + GstSplitMuxSrc *splitmux = GST_SPLITMUX_SRC (parent); + gboolean ret = FALSE; + + GST_LOG_OBJECT (parent, "query %" GST_PTR_FORMAT + " on %" GST_PTR_FORMAT, query, pad); + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_CAPS: + case GST_QUERY_POSITION:{ + GstSplitMuxPartReader *part; + SplitMuxSrcPad *anypad; + + SPLITMUX_SRC_LOCK (splitmux); + SPLITMUX_SRC_PADS_LOCK (splitmux); + anypad = (SplitMuxSrcPad *) (splitmux->pads->data); + part = splitmux->parts[anypad->cur_part]; + ret = gst_splitmux_part_reader_src_query (part, pad, query); + SPLITMUX_SRC_PADS_UNLOCK (splitmux); + SPLITMUX_SRC_UNLOCK (splitmux); + break; + } + case GST_QUERY_DURATION:{ + GstFormat fmt; + gst_query_parse_duration (query, &fmt, NULL); + if (fmt != GST_FORMAT_TIME) + break; + + SPLITMUX_SRC_LOCK (splitmux); + if (splitmux->total_duration > 0) { + gst_query_set_duration (query, GST_FORMAT_TIME, + splitmux->total_duration); + ret = TRUE; + } + SPLITMUX_SRC_UNLOCK (splitmux); + break; + } + case GST_QUERY_SEEKING:{ + GstFormat format; + + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + if (format != GST_FORMAT_TIME) + break; + + SPLITMUX_SRC_LOCK (splitmux); + gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0, + splitmux->total_duration); + ret = TRUE; + SPLITMUX_SRC_UNLOCK (splitmux); + + break; + } + default: + break; + } + return ret; +} + + +gboolean +register_splitmuxsrc (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsrc", 0, + "Split File Demuxing Source"); + + return gst_element_register (plugin, "splitmuxsrc", GST_RANK_NONE, + GST_TYPE_SPLITMUX_SRC); +} diff --git a/gst/multifile/gstsplitmuxsrc.h b/gst/multifile/gstsplitmuxsrc.h new file mode 100644 index 000000000..228453c17 --- /dev/null +++ b/gst/multifile/gstsplitmuxsrc.h @@ -0,0 +1,108 @@ +/* GStreamer Split Muxed File Source + * Copyright (C) 2014 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifndef __GST_SPLITMUX_SRC_H__ +#define __GST_SPLITMUX_SRC_H__ + +#include <gst/gst.h> + +#include "gstsplitmuxpartreader.h" + +G_BEGIN_DECLS + +#define GST_TYPE_SPLITMUX_SRC \ + (gst_splitmux_src_get_type()) +#define GST_SPLITMUX_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_SRC,GstSplitMuxSrc)) +#define GST_SPLITMUX_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_SRC,GstSplitMuxSrcClass)) +#define GST_IS_SPLITMUX_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_SRC)) +#define GST_IS_SPLITMUX_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_SRC)) + +typedef struct _GstSplitMuxSrc GstSplitMuxSrc; +typedef struct _GstSplitMuxSrcClass GstSplitMuxSrcClass; + +struct _GstSplitMuxSrc +{ + GstBin parent; + + GMutex lock; + gboolean running; + + gchar *location; /* OBJECT_LOCK */ + + GstSplitMuxPartReader **parts; + guint num_parts; + guint cur_part; + + gboolean pads_complete; + GMutex pads_lock; + GList *pads; /* pads_lock */ + + GstClockTime total_duration; + GstSegment play_segment; +}; + +struct _GstSplitMuxSrcClass +{ + GstBinClass parent_class; +}; + +GType splitmux_src_pad_get_type (void); +#define SPLITMUX_TYPE_SRC_PAD splitmux_src_pad_get_type() +#define SPLITMUX_SRC_PAD_CAST(p) ((SplitMuxSrcPad *)(p)) +#define SPLITMUX_SRC_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),SPLITMUX_TYPE_SRC_PAD,SplitMuxSrcPad)) + +struct _SplitMuxSrcPad +{ + GstPad parent; + + guint cur_part; + GstSplitMuxPartReader *reader; + GstPad *part_pad; + + GstSegment segment; + + gboolean set_next_discont; + gboolean clear_next_discont; + + gboolean sent_stream_start; + gboolean sent_caps; + gboolean sent_segment; +}; + +struct _SplitMuxSrcPadClass +{ + GstPadClass parent; +}; + +GType gst_splitmux_src_get_type (void); +gboolean register_splitmuxsrc (GstPlugin * plugin); + +#define SPLITMUX_SRC_LOCK(s) g_mutex_lock(&(s)->lock) +#define SPLITMUX_SRC_UNLOCK(s) g_mutex_unlock(&(s)->lock) + +#define SPLITMUX_SRC_PADS_LOCK(s) g_mutex_lock(&(s)->pads_lock) +#define SPLITMUX_SRC_PADS_UNLOCK(s) g_mutex_unlock(&(s)->pads_lock) + +G_END_DECLS + +#endif /* __GST_SPLITMUX_SRC_H__ */ diff --git a/gst/multifile/gstsplitutils.c b/gst/multifile/gstsplitutils.c new file mode 100644 index 000000000..9b088a5f7 --- /dev/null +++ b/gst/multifile/gstsplitutils.c @@ -0,0 +1,105 @@ +/* GStreamer Split Source Utility Functions + * Copyright (C) 2011 Collabora Ltd. <tim.muller@collabora.co.uk> + * Copyright (C) 2014 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <string.h> + +#include "gstsplitutils.h" +#include "patternspec.h" + +static int +gst_split_util_array_sortfunc (gchar ** a, gchar ** b) +{ + return strcmp (*a, *b); +} + +gchar ** +gst_split_util_find_files (const gchar * dirname, + const gchar * basename, GError ** err) +{ + PatternSpec *pspec; + GPtrArray *files; + const gchar *name; + GDir *dir; + + if (dirname == NULL || basename == NULL) + goto invalid_location; + + GST_INFO ("checking in directory '%s' for pattern '%s'", dirname, basename); + + dir = g_dir_open (dirname, 0, err); + if (dir == NULL) + return NULL; + + if (DEFAULT_PATTERN_MATCH_MODE == MATCH_MODE_UTF8 && + !g_utf8_validate (basename, -1, NULL)) { + goto not_utf8; + } + + /* mode will be AUTO on linux/unix and UTF8 on win32 */ + pspec = pattern_spec_new (basename, DEFAULT_PATTERN_MATCH_MODE); + + files = g_ptr_array_new (); + + while ((name = g_dir_read_name (dir))) { + GST_TRACE ("check: %s", name); + if (pattern_match_string (pspec, name)) { + GST_DEBUG ("match: %s", name); + g_ptr_array_add (files, g_build_filename (dirname, name, NULL)); + } + } + + if (files->len == 0) + goto no_matches; + + g_ptr_array_sort (files, (GCompareFunc) gst_split_util_array_sortfunc); + g_ptr_array_add (files, NULL); + + pattern_spec_free (pspec); + g_dir_close (dir); + + return (gchar **) g_ptr_array_free (files, FALSE); + +/* ERRORS */ +invalid_location: + { + g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_INVAL, + "No filename specified."); + return NULL; + } +not_utf8: + { + g_dir_close (dir); + g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_INVAL, + "Filename pattern must be UTF-8 on Windows."); + return NULL; + } +no_matches: + { + pattern_spec_free (pspec); + g_dir_close (dir); + g_set_error_literal (err, G_FILE_ERROR, G_FILE_ERROR_NOENT, + "Found no files matching the pattern."); + return NULL; + } +} diff --git a/gst/multifile/gstsplitutils.h b/gst/multifile/gstsplitutils.h new file mode 100644 index 000000000..2c78b233f --- /dev/null +++ b/gst/multifile/gstsplitutils.h @@ -0,0 +1,40 @@ +/* GStreamer Split Source Utility Functions + * Copyright (C) 2011 Collabora Ltd. <tim.muller@collabora.co.uk> + * Copyright (C) 2014 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SPLITUTILS_H__ +#define __GST_SPLITUTILS_H__ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +#ifdef G_OS_WIN32 +#define DEFAULT_PATTERN_MATCH_MODE MATCH_MODE_UTF8 +#else +#define DEFAULT_PATTERN_MATCH_MODE MATCH_MODE_AUTO +#endif + +gchar ** +gst_split_util_find_files (const gchar * dirname, + const gchar * basename, GError ** err); + +G_END_DECLS + +#endif diff --git a/gst/multifile/test-splitmuxpartreader.c b/gst/multifile/test-splitmuxpartreader.c new file mode 100644 index 000000000..18756a6ee --- /dev/null +++ b/gst/multifile/test-splitmuxpartreader.c @@ -0,0 +1,104 @@ +#include <gst/gst.h> +#include "gstsplitmuxpartreader.h" +#include "gstsplitmuxsrc.h" + +GST_DEBUG_CATEGORY_EXTERN (splitmux_debug); + +static const gchar *const path = "out001.mp4"; + +typedef struct _CustomData +{ + GstSplitMuxPartReader *reader; + GMainLoop *main_loop; + GstBus *bus; +} CustomData; + +static void +part_prepared (GstSplitMuxPartReader * reader) +{ + g_print ("Part prepared\n"); +} + +static gboolean +handle_message (GstBus * bus, GstMessage * msg, CustomData * data) +{ + GError *err; + gchar *debug_info; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR: + gst_message_parse_error (msg, &err, &debug_info); + g_print ("Error received from element %s: %s\n", + GST_OBJECT_NAME (msg->src), err->message); + g_print ("Debugging information: %s\n", debug_info ? debug_info : "none"); + g_clear_error (&err); + g_free (debug_info); + g_main_loop_quit (data->main_loop); + break; + case GST_MESSAGE_EOS: + g_print ("End-Of-Stream reached.\n"); + g_main_loop_quit (data->main_loop); + break; + default: + break; + } + + return TRUE; +} + +static gboolean +start_reader (CustomData * data) +{ + g_print ("Preparing part reader for %s\n", path); + gst_splitmux_part_reader_prepare (data->reader); + return FALSE; +} + +static GstPad * +handle_get_pad (GstSplitMuxPartReader * reader, GstPad * src_pad, + CustomData * data) +{ + /* Create a dummy target pad for the reader */ + GstPad *new_pad = g_object_new (SPLITMUX_TYPE_SRC_PAD, + "name", GST_PAD_NAME (src_pad), "direction", GST_PAD_SRC, NULL); + + g_print ("Creating new dummy pad %s\n", GST_PAD_NAME (src_pad)); + + return new_pad; +} + +int +main (int argc, char **argv) +{ + CustomData data; + + gst_init (&argc, &argv); + + data.main_loop = g_main_loop_new (NULL, FALSE); + + data.reader = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL); + data.bus = gst_element_get_bus (GST_ELEMENT_CAST (data.reader)); + + /* Listen for bus messages */ + gst_bus_add_watch (data.bus, (GstBusFunc) handle_message, &data); + + gst_splitmux_part_reader_set_location (data.reader, path); + + /* Connect to prepare signal */ + g_signal_connect (data.reader, "prepared", (GCallback) part_prepared, &data); + gst_splitmux_part_reader_set_callbacks (data.reader, &data, + (GstSplitMuxPartReaderPadCb) handle_get_pad); + + g_idle_add ((GSourceFunc) start_reader, &data); + + /* Run mainloop */ + g_main_loop_run (data.main_loop); + + gst_splitmux_part_reader_unprepare (data.reader); + + g_main_loop_unref (data.main_loop); + gst_object_unref (data.bus); + g_object_unref (data.reader); + + return 0; +} diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 0d3288acd..407e11ff4 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -198,7 +198,7 @@ check_matroska = endif if USE_PLUGIN_MULTIFILE -check_multifile = elements/multifile +check_multifile = elements/multifile elements/splitmux else check_multifile = endif diff --git a/tests/check/elements/.gitignore b/tests/check/elements/.gitignore index e2b47de91..dcfa3eac7 100644 --- a/tests/check/elements/.gitignore +++ b/tests/check/elements/.gitignore @@ -58,6 +58,7 @@ rtprtx shapewipe souphttpsrc spectrum +splitmux sunaudio udpsink udpsrc diff --git a/tests/check/elements/splitmux.c b/tests/check/elements/splitmux.c new file mode 100644 index 000000000..00698e24b --- /dev/null +++ b/tests/check/elements/splitmux.c @@ -0,0 +1,126 @@ +/* GStreamer unit test for splitmuxsrc/sink elements + * + * Copyright (C) 2007 David A. Schleef <ds@schleef.org> + * Copyright (C) 2015 Jan Schmidt <jan@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <glib/gstdio.h> +#include <unistd.h> + +#include <gst/check/gstcheck.h> +#include <stdlib.h> +#include <unistd.h> + +gchar *tmpdir = NULL; + +static void +tempdir_setup (void) +{ + const gchar *systmp = g_get_tmp_dir (); + tmpdir = g_build_filename (systmp, "splitmux-test-XXXXXX", NULL); + /* Rewrites tmpdir template input: */ + tmpdir = g_mkdtemp (tmpdir); +} + +static void +tempdir_cleanup (void) +{ + GDir *d; + const gchar *f; + + fail_if (tmpdir == NULL); + + d = g_dir_open (tmpdir, 0, NULL); + fail_if (d == NULL); + + while ((f = g_dir_read_name (d)) != NULL) + fail_if (g_remove (f) != 0); + g_dir_close (d); + + fail_if (g_remove (tmpdir) != 0); + + g_free (tmpdir); + tmpdir = NULL; +} + +static GstMessage * +run_pipeline (GstElement * pipeline) +{ + GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline)); + GstMessage *msg; + + gst_element_set_state (pipeline, GST_STATE_PLAYING); + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + gst_element_set_state (pipeline, GST_STATE_NULL); + + gst_object_unref (bus); + + return msg; +} + +GST_START_TEST (test_splitmuxsrc) +{ + GstMessage *msg; + GstElement *pipeline; + GstElement *fakesink; + gchar *in_pattern; + gchar *uri; + + pipeline = gst_element_factory_make ("playbin", NULL); + fail_if (pipeline == NULL); + + fakesink = gst_element_factory_make ("fakesink", NULL); + fail_if (fakesink == NULL); + g_object_set (G_OBJECT (pipeline), "video-sink", fakesink, NULL); + + in_pattern = g_build_filename (GST_TEST_FILES_PATH, "splitvideo*.ogg", NULL); + uri = g_strdup_printf ("splitmux://%s", in_pattern); + g_free (in_pattern); + + g_object_set (G_OBJECT (pipeline), "uri", uri, NULL); + g_free (uri); + + msg = run_pipeline (pipeline); + gst_object_unref (pipeline); + + fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR); + gst_message_unref (msg); +} + +GST_END_TEST; + +static Suite * +splitmux_suite (void) +{ + Suite *s = suite_create ("splitmux"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + + tcase_add_checked_fixture (tc_chain, tempdir_setup, tempdir_cleanup); + + tcase_add_test (tc_chain, test_splitmuxsrc); + + return s; +} + +GST_CHECK_MAIN (splitmux); diff --git a/tests/files/splitvideo00.ogg b/tests/files/splitvideo00.ogg Binary files differnew file mode 100644 index 000000000..e9db2f466 --- /dev/null +++ b/tests/files/splitvideo00.ogg diff --git a/tests/files/splitvideo01.ogg b/tests/files/splitvideo01.ogg Binary files differnew file mode 100644 index 000000000..3e91c53da --- /dev/null +++ b/tests/files/splitvideo01.ogg diff --git a/tests/files/splitvideo02.ogg b/tests/files/splitvideo02.ogg Binary files differnew file mode 100644 index 000000000..921671b28 --- /dev/null +++ b/tests/files/splitvideo02.ogg |