summaryrefslogtreecommitdiff
path: root/gst/multifile/gstsplitmuxsink.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/multifile/gstsplitmuxsink.c')
-rw-r--r--gst/multifile/gstsplitmuxsink.c1460
1 files changed, 1460 insertions, 0 deletions
diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c
new file mode 100644
index 000000000..c6b4e4a6f
--- /dev/null
+++ b/gst/multifile/gstsplitmuxsink.c
@@ -0,0 +1,1460 @@
+/* GStreamer Muxer bin that splits output stream by size/time
+ * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-splitmuxsink
+ * @short_description: Muxer wrapper for splitting output stream by size or time
+ *
+ * This element wraps a muxer and a sink, and starts a new file when the mux
+ * contents are about to cross a threshold of maximum size of maximum time,
+ * splitting at video keyframe boundaries. Exactly one input video stream
+ * is required, with as many accompanying audio and subtitle streams as
+ * desired.
+ *
+ * By default, it uses mp4mux and filesink, but they can be changed via
+ * the 'muxer' and 'sink' properties.
+ *
+ * The minimum file size is 1 GOP, however - so limits may be overrun if the
+ * distance between any 2 keyframes is larger than the limits.
+ *
+ * The splitting process is driven by the video stream contents, and
+ * the video stream must contain closed GOPs for the output file parts
+ * to be played individually correctly.
+ *
+ * <refsect2>
+ * <title>Example pipelines</title>
+ * |[
+ * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc max-key-int=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
+ * ]|
+ *
+ * Records a video stream captured from a v4l2 device and muxes it into
+ * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
+ * and 1MB maximum size.
+ *
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+#include "gstsplitmuxsink.h"
+
+GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
+#define GST_CAT_DEFAULT splitmux_debug
+
+#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
+#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
+#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
+
+enum
+{
+ PROP_0,
+ PROP_LOCATION,
+ PROP_MAX_SIZE_TIME,
+ PROP_MAX_SIZE_BYTES,
+ PROP_MUXER_OVERHEAD,
+ PROP_MUXER,
+ PROP_SINK
+};
+
+#define DEFAULT_MAX_SIZE_TIME 0
+#define DEFAULT_MAX_SIZE_BYTES 0
+#define DEFAULT_MUXER_OVERHEAD 0.02
+#define DEFAULT_MUXER "mp4mux"
+#define DEFAULT_SINK "filesink"
+
+static GstStaticPadTemplate video_sink_template =
+GST_STATIC_PAD_TEMPLATE ("video",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS_ANY);
+static GstStaticPadTemplate audio_sink_template =
+GST_STATIC_PAD_TEMPLATE ("audio_%u",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS_ANY);
+static GstStaticPadTemplate subtitle_sink_template =
+GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS_ANY);
+
+static GQuark PAD_CONTEXT;
+
+static void
+_do_init (void)
+{
+ PAD_CONTEXT = g_quark_from_static_string ("pad-context");
+}
+
+#define gst_splitmux_sink_parent_class parent_class
+G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
+ _do_init ());
+
+static gboolean create_elements (GstSplitMuxSink * splitmux);
+static gboolean create_sink (GstSplitMuxSink * splitmux);
+static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_splitmux_sink_dispose (GObject * object);
+static void gst_splitmux_sink_finalize (GObject * object);
+
+static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
+static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
+
+static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
+ element, GstStateChange transition);
+
+static void bus_handler (GstBin * bin, GstMessage * msg);
+static void set_next_filename (GstSplitMuxSink * splitmux);
+static void start_next_fragment (GstSplitMuxSink * splitmux);
+static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+
+static MqStreamBuf *
+mq_stream_buf_new (void)
+{
+ return g_slice_new0 (MqStreamBuf);
+}
+
+static void
+mq_stream_buf_free (MqStreamBuf * data)
+{
+ g_slice_free (MqStreamBuf, data);
+}
+
+static void
+gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
+{
+ GObjectClass *gobject_class = (GObjectClass *) klass;
+ GstElementClass *gstelement_class = (GstElementClass *) klass;
+ GstBinClass *gstbin_class = (GstBinClass *) klass;
+
+ gobject_class->set_property = gst_splitmux_sink_set_property;
+ gobject_class->get_property = gst_splitmux_sink_get_property;
+ gobject_class->dispose = gst_splitmux_sink_dispose;
+ gobject_class->finalize = gst_splitmux_sink_finalize;
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "Split Muxing Bin", "Generic/Bin/Muxer",
+ "Convenience bin that muxes incoming streams into multiple time/size limited files",
+ "Jan Schmidt <jan@centricular.com>");
+
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&video_sink_template));
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&audio_sink_template));
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&subtitle_sink_template));
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
+ gstelement_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
+ gstelement_class->release_pad =
+ GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
+
+ gstbin_class->handle_message = bus_handler;
+
+ g_object_class_install_property (gobject_class, PROP_LOCATION,
+ g_param_spec_string ("location", "File Output Pattern",
+ "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
+ NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
+ g_param_spec_double ("mux-overhead", "Muxing Overhead",
+ "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
+ DEFAULT_MUXER_OVERHEAD,
+ G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
+ g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
+ "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
+ g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
+ "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
+ DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MUXER,
+ g_param_spec_object ("muxer", "Muxer",
+ "The muxer element to use (NULL = default mp4mux)",
+ GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_SINK,
+ g_param_spec_object ("sink", "Sink",
+ "The sink element (or element chain) to use (NULL = default filesink)",
+ GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void
+gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
+{
+ g_mutex_init (&splitmux->lock);
+ g_cond_init (&splitmux->data_cond);
+
+ splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
+ splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
+ splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
+}
+
+static void
+gst_splitmux_reset (GstSplitMuxSink * splitmux)
+{
+ if (splitmux->mq)
+ gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
+ if (splitmux->muxer)
+ gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
+ if (splitmux->active_sink)
+ gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
+
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
+ NULL;
+}
+
+static void
+gst_splitmux_sink_dispose (GObject * object)
+{
+ GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+
+ /* Calling parent dispose invalidates all child pointers */
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
+ NULL;
+}
+
+static void
+gst_splitmux_sink_finalize (GObject * object)
+{
+ GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
+ g_cond_clear (&splitmux->data_cond);
+ if (splitmux->provided_sink)
+ gst_object_unref (splitmux->provided_sink);
+
+ g_free (splitmux->location);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_splitmux_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
+
+ switch (prop_id) {
+ case PROP_LOCATION:{
+ GST_OBJECT_LOCK (splitmux);
+ g_free (splitmux->location);
+ splitmux->location = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ }
+ case PROP_MAX_SIZE_BYTES:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->threshold_bytes = g_value_get_uint64 (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->threshold_time = g_value_get_uint64 (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MUXER_OVERHEAD:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->mux_overhead = g_value_get_double (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_SINK:
+ GST_SPLITMUX_LOCK (splitmux);
+ if (splitmux->provided_sink)
+ gst_object_unref (splitmux->provided_sink);
+ splitmux->provided_sink = g_value_dup_object (value);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case PROP_MUXER:
+ GST_SPLITMUX_LOCK (splitmux);
+ if (splitmux->provided_muxer)
+ gst_object_unref (splitmux->provided_muxer);
+ splitmux->provided_muxer = g_value_dup_object (value);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_splitmux_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
+
+ switch (prop_id) {
+ case PROP_LOCATION:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_string (value, splitmux->location);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MAX_SIZE_BYTES:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_uint64 (value, splitmux->threshold_bytes);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_uint64 (value, splitmux->threshold_time);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MUXER_OVERHEAD:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_double (value, splitmux->mux_overhead);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_SINK:
+ GST_SPLITMUX_LOCK (splitmux);
+ g_value_set_object (value, splitmux->provided_sink);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case PROP_MUXER:
+ GST_SPLITMUX_LOCK (splitmux);
+ g_value_set_object (value, splitmux->provided_muxer);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstPad *
+mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
+{
+ gchar *tmp, *sinkname, *srcname;
+ GstPad *mq_src;
+
+ sinkname = gst_pad_get_name (sink_pad);
+ tmp = sinkname + 5;
+ srcname = g_strdup_printf ("src_%s", tmp);
+
+ mq_src = gst_element_get_static_pad (mq, srcname);
+
+ g_free (sinkname);
+ g_free (srcname);
+
+ return mq_src;
+}
+
+static gboolean
+get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
+ GstPad ** src_pad)
+{
+ GstPad *mq_sink;
+ GstPad *mq_src;
+
+ /* Request a pad from multiqueue, then connect this one, then
+ * discover the corresponding output pad and return both */
+ mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
+ if (mq_sink == NULL)
+ return FALSE;
+
+ mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
+ if (mq_src == NULL)
+ goto fail;
+
+ *sink_pad = mq_sink;
+ *src_pad = mq_src;
+
+ return TRUE;
+
+fail:
+ gst_element_release_request_pad (splitmux->mq, mq_sink);
+ return FALSE;
+}
+
+static MqStreamCtx *
+mq_stream_ctx_new (GstSplitMuxSink * splitmux)
+{
+ MqStreamCtx *ctx;
+
+ ctx = g_new0 (MqStreamCtx, 1);
+ g_atomic_int_set (&ctx->refcount, 1);
+ ctx->splitmux = splitmux;
+ gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
+ gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
+ ctx->in_running_time = ctx->out_running_time = 0;
+ g_queue_init (&ctx->queued_bufs);
+ return ctx;
+}
+
+static void
+mq_stream_ctx_free (MqStreamCtx * ctx)
+{
+ g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
+ g_queue_clear (&ctx->queued_bufs);
+ g_free (ctx);
+}
+
+static void
+mq_stream_ctx_unref (MqStreamCtx * ctx)
+{
+ if (g_atomic_int_dec_and_test (&ctx->refcount))
+ mq_stream_ctx_free (ctx);
+}
+
+static void
+mq_stream_ctx_ref (MqStreamCtx * ctx)
+{
+ g_atomic_int_inc (&ctx->refcount);
+}
+
+static void
+_pad_block_destroy_sink_notify (MqStreamCtx * ctx)
+{
+ ctx->sink_pad_block_id = 0;
+ mq_stream_ctx_unref (ctx);
+}
+
+static void
+_pad_block_destroy_src_notify (MqStreamCtx * ctx)
+{
+ ctx->src_pad_block_id = 0;
+ mq_stream_ctx_unref (ctx);
+}
+
+/* Called with lock held, drops the lock to send EOS to the
+ * pad
+ */
+static void
+send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
+{
+ GstEvent *eos;
+ GstPad *pad;
+
+ eos = gst_event_new_eos ();
+ pad = gst_pad_get_peer (ctx->srcpad);
+
+ ctx->out_eos = TRUE;
+
+ GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ gst_pad_send_event (pad, eos);
+ GST_SPLITMUX_LOCK (splitmux);
+
+ gst_object_unref (pad);
+}
+
+/* Called with splitmux lock held to check if this output
+ * context needs to sleep to wait for the release of the
+ * next GOP, or to send EOS to close out the current file
+ */
+static void
+complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
+{
+ do {
+
+ GST_LOG_OBJECT (ctx->srcpad,
+ "Checking running time %" GST_TIME_FORMAT " against max %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
+ GST_TIME_ARGS (splitmux->max_out_running_time));
+
+ if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
+ ctx->out_running_time < splitmux->max_out_running_time)
+ return;
+
+ if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
+ return;
+
+ if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
+ if (ctx->out_eos == FALSE) {
+ send_eos (splitmux, ctx);
+ continue;
+ }
+ } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
+ start_next_fragment (splitmux);
+ continue;
+ }
+
+ GST_INFO_OBJECT (ctx->srcpad,
+ "Sleeping for running time %"
+ GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
+ GST_TIME_ARGS (ctx->out_running_time),
+ GST_TIME_ARGS (splitmux->max_out_running_time));
+ ctx->out_blocked = TRUE;
+ /* Expand the mq if needed before sleeping */
+ check_queue_length (splitmux, ctx);
+ GST_SPLITMUX_WAIT (splitmux);
+ ctx->out_blocked = FALSE;
+ GST_INFO_OBJECT (ctx->srcpad,
+ "Woken for new max running time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (splitmux->max_out_running_time));
+ } while (1);
+}
+
+static GstPadProbeReturn
+handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
+{
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+ MqStreamBuf *buf_info = NULL;
+
+ GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+
+ /* FIXME: Handle buffer lists, until then make it clear they won't work */
+ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
+ g_warning ("Buffer list handling not implemented");
+ return GST_PAD_PROBE_DROP;
+ }
+ if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
+ GstEvent *event = gst_pad_probe_info_get_event (info);
+
+ GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ gst_event_copy_segment (event, &ctx->out_segment);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ GST_SPLITMUX_LOCK (splitmux);
+ gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
+ g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
+ g_queue_clear (&ctx->queued_bufs);
+ ctx->flushing = FALSE;
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case GST_EVENT_FLUSH_START:
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_LOG_OBJECT (pad, "Flush start");
+ ctx->flushing = TRUE;
+ GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case GST_EVENT_EOS:
+ GST_SPLITMUX_LOCK (splitmux);
+ if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ goto beach;
+ ctx->out_eos = TRUE;
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case GST_EVENT_GAP:{
+ GstClockTime gap_ts;
+
+ gst_event_parse_gap (event, &gap_ts, NULL);
+ if (gap_ts == GST_CLOCK_TIME_NONE)
+ break;
+
+ GST_SPLITMUX_LOCK (splitmux);
+
+ gap_ts = gst_segment_to_running_time (&ctx->out_segment,
+ GST_FORMAT_TIME, gap_ts);
+
+ GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (gap_ts));
+
+ if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ goto beach;
+ ctx->out_running_time = gap_ts;
+ complete_or_wait_on_out (splitmux, ctx);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ }
+ default:
+ break;
+ }
+ return GST_PAD_PROBE_PASS;
+ }
+
+ /* Allow everything through until the configured next stopping point */
+ GST_SPLITMUX_LOCK (splitmux);
+
+ buf_info = g_queue_pop_tail (&ctx->queued_bufs);
+ if (buf_info == NULL)
+ /* Can only happen due to a poorly timed flush */
+ goto beach;
+
+ /* If we have popped a keyframe, decrement the queued_gop count */
+ if (buf_info->keyframe && splitmux->queued_gops > 0)
+ splitmux->queued_gops--;
+
+ ctx->out_running_time = buf_info->run_ts;
+
+ GST_LOG_OBJECT (splitmux,
+ "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
+ " size %" G_GSIZE_FORMAT,
+ pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+
+ complete_or_wait_on_out (splitmux, ctx);
+
+ if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
+ splitmux->muxed_out_time < buf_info->run_ts)
+ splitmux->muxed_out_time = buf_info->run_ts;
+
+ splitmux->muxed_out_bytes += buf_info->buf_size;
+
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
+ GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
+ " run ts %" GST_TIME_FORMAT, buf,
+ GST_TIME_ARGS (ctx->out_running_time));
+ }
+#endif
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ mq_stream_buf_free (buf_info);
+
+ return GST_PAD_PROBE_PASS;
+
+beach:
+ GST_SPLITMUX_UNLOCK (splitmux);
+ if (buf_info)
+ mq_stream_buf_free (buf_info);
+ return GST_PAD_PROBE_DROP;
+}
+
+static gboolean
+resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
+{
+ return gst_pad_send_event (peer, gst_event_ref (*event));
+}
+
+static void
+restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
+{
+ GstPad *peer = gst_pad_get_peer (ctx->srcpad);
+
+ gst_pad_sticky_events_foreach (ctx->srcpad,
+ (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
+
+ /* Clear EOS flag */
+ ctx->out_eos = FALSE;
+}
+
+/* Called with lock held when a fragment
+ * reaches EOS and it is time to restart
+ * a new fragment
+ */
+static void
+start_next_fragment (GstSplitMuxSink * splitmux)
+{
+ /* 1 change to new file */
+ gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
+ gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
+
+ set_next_filename (splitmux);
+
+ gst_element_sync_state_with_parent (splitmux->active_sink);
+ gst_element_sync_state_with_parent (splitmux->muxer);
+
+ g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
+
+ /* Switch state and go back to processing */
+ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+ if (!splitmux->video_ctx->in_eos)
+ splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
+ else
+ splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+
+ /* Store the overflow parameters as the basis for the next fragment */
+ splitmux->mux_start_time = splitmux->muxed_out_time;
+ splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+
+ GST_DEBUG_OBJECT (splitmux,
+ "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (splitmux->max_out_running_time));
+
+ GST_SPLITMUX_BROADCAST (splitmux);
+}
+
+static void
+bus_handler (GstBin * bin, GstMessage * message)
+{
+ GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_EOS:
+ /* If the state is draining out the current file, drop this EOS */
+ GST_SPLITMUX_LOCK (splitmux);
+ if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
+ splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
+ GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
+ splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
+ GST_SPLITMUX_BROADCAST (splitmux);
+
+ gst_message_unref (message);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ return;
+ }
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ break;
+ }
+
+ GST_BIN_CLASS (parent_class)->handle_message (bin, message);
+}
+
+/* Called with splitmux lock held */
+/* Called when entering ProcessingCompleteGop state
+ * Assess if mq contents overflowed the current file
+ * -> If yes, need to switch to new file
+ * -> if no, set max_out_running_time to let this GOP in and
+ * go to COLLECTING_GOP_START state
+ */
+static void
+handle_gathered_gop (GstSplitMuxSink * splitmux)
+{
+ GList *cur;
+ gsize queued_bytes = 0;
+ GstClockTime queued_time = 0;
+
+ /* Assess if the multiqueue contents overflowed the current file */
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ if (tmpctx->in_running_time > queued_time)
+ queued_time = tmpctx->in_running_time;
+ queued_bytes += tmpctx->in_bytes;
+ }
+
+ g_assert (queued_bytes >= splitmux->mux_start_bytes);
+ g_assert (queued_time >= splitmux->mux_start_time);
+
+ queued_bytes -= splitmux->mux_start_bytes;
+ queued_time -= splitmux->mux_start_time;
+
+ /* Expand queued bytes estimate by muxer overhead */
+ queued_bytes += (queued_bytes * splitmux->mux_overhead);
+
+ GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
+ " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
+
+ /* Check for overrun - have we output at least one byte and overrun
+ * either threshold? */
+ if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
+ ((splitmux->threshold_bytes > 0 &&
+ queued_bytes >= splitmux->threshold_bytes) ||
+ (splitmux->threshold_time > 0 &&
+ queued_time >= splitmux->threshold_time))) {
+
+ splitmux->state = SPLITMUX_STATE_ENDING_FILE;
+
+ GST_INFO_OBJECT (splitmux,
+ "mq overflowed since last, draining out. max out TS is %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+ GST_SPLITMUX_BROADCAST (splitmux);
+
+ } else {
+ /* No overflow */
+ GST_LOG_OBJECT (splitmux,
+ "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
+ " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
+ splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
+ queued_bytes, GST_TIME_ARGS (queued_time));
+
+ /* Wake everyone up to push this one GOP, then sleep */
+ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+ if (!splitmux->video_ctx->in_eos)
+ splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
+ else
+ splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+
+ GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+ GST_SPLITMUX_BROADCAST (splitmux);
+ }
+
+}
+
+/* Called with splitmux lock held */
+/* Called from each input pad when it is has all the pieces
+ * for a GOP or EOS, starting with the video pad which has set the
+ * splitmux->max_in_running_time
+ */
+static void
+check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
+{
+ GList *cur;
+ gboolean ready = TRUE;
+
+ if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+ /* Iterate each pad, and check that the input running time is at least
+ * up to the video runnning time, and if so handle the collected GOP */
+ GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+
+ GST_LOG_OBJECT (splitmux,
+ "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
+ " EOS %d", tmpctx, tmpctx->srcpad,
+ GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
+
+ if (tmpctx->in_running_time < splitmux->max_in_running_time &&
+ !tmpctx->in_eos) {
+ GST_LOG_OBJECT (splitmux,
+ "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
+ tmpctx, tmpctx->srcpad);
+ ready = FALSE;
+ break;
+ }
+ }
+ if (ready) {
+ GST_DEBUG_OBJECT (splitmux,
+ "Collected GOP is complete. Processing (ctx %p)", ctx);
+ /* All pads have a complete GOP, release it into the multiqueue */
+ handle_gathered_gop (splitmux);
+ }
+ }
+
+ /* Some pad is not yet ready, or GOP is being pushed
+ * either way, sleep and wait to get woken */
+ while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
+ splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
+ !ctx->flushing) {
+
+ GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
+ splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
+ "GOP complete" : "EOF draining", ctx);
+ GST_SPLITMUX_WAIT (splitmux);
+
+ GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
+ }
+}
+
+static void
+check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
+{
+ GList *cur;
+ guint cur_len = g_queue_get_length (&ctx->queued_bufs);
+
+ GST_DEBUG_OBJECT (ctx->sinkpad,
+ "Checking queue length len %u cur_max %u queued gops %u",
+ cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
+
+ if (cur_len >= splitmux->mq_max_buffers) {
+ gboolean allow_grow = FALSE;
+
+ /* If collecting a GOP and this pad might block,
+ * and there isn't already a pending GOP in the queue
+ * then grow
+ */
+ if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
+ ctx->in_running_time < splitmux->max_in_running_time &&
+ splitmux->queued_gops <= 1) {
+ allow_grow = TRUE;
+ } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
+ ctx->is_video) {
+ allow_grow = TRUE;
+ }
+
+ if (!allow_grow) {
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ GST_DEBUG_OBJECT (tmpctx->sinkpad,
+ " len %u out_blocked %d",
+ g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
+ /* If another stream is starving, grow */
+ if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
+ allow_grow = TRUE;
+ }
+ }
+ }
+
+ if (allow_grow) {
+ splitmux->mq_max_buffers = cur_len + 1;
+
+ GST_INFO_OBJECT (splitmux,
+ "Multiqueue overrun - enlarging to %u buffers ctx %p",
+ splitmux->mq_max_buffers, ctx);
+
+ g_object_set (splitmux->mq, "max-size-buffers",
+ splitmux->mq_max_buffers, NULL);
+ }
+ }
+}
+
+static GstPadProbeReturn
+handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
+{
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+ GstBuffer *buf;
+ MqStreamBuf *buf_info;
+ GstClockTime ts;
+ gboolean loop_again;
+ gboolean keyframe = FALSE;
+
+ GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+
+ /* FIXME: Handle buffer lists, until then make it clear they won't work */
+ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
+ g_warning ("Buffer list handling not implemented");
+ return GST_PAD_PROBE_DROP;
+ }
+ if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
+ GstEvent *event = gst_pad_probe_info_get_event (info);
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ gst_event_copy_segment (event, &ctx->in_segment);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ GST_SPLITMUX_LOCK (splitmux);
+ gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
+ ctx->in_eos = FALSE;
+ ctx->in_bytes = 0;
+ ctx->in_running_time = 0;
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ case GST_EVENT_EOS:
+ GST_SPLITMUX_LOCK (splitmux);
+ ctx->in_eos = TRUE;
+
+ if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ goto beach;
+
+ if (ctx->is_video) {
+ GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
+ /* Act as if this is a new keyframe with infinite timestamp */
+ splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
+ splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ /* Wake up other input pads to collect this GOP */
+ GST_SPLITMUX_BROADCAST (splitmux);
+ check_completed_gop (splitmux, ctx);
+ } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+ /* If we are waiting for a GOP to be completed (ie, for aux
+ * pads to catch up), then this pad is complete, so check
+ * if the whole GOP is.
+ */
+ check_completed_gop (splitmux, ctx);
+ }
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ break;
+ }
+ return GST_PAD_PROBE_PASS;
+ }
+
+ buf = gst_pad_probe_info_get_buffer (info);
+ ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
+ GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
+ buf_info = mq_stream_buf_new ();
+
+ if (GST_BUFFER_PTS_IS_VALID (buf))
+ ts = GST_BUFFER_PTS (buf);
+ else
+ ts = GST_BUFFER_DTS (buf);
+
+ GST_SPLITMUX_LOCK (splitmux);
+
+ if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ goto beach;
+
+ /* If this buffer has a timestamp, advance the input timestamp of the
+ * stream */
+ if (GST_CLOCK_TIME_IS_VALID (ts)) {
+ GstClockTime running_time =
+ gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
+ GST_BUFFER_TIMESTAMP (buf));
+
+ if (GST_CLOCK_TIME_IS_VALID (running_time) &&
+ (ctx->in_running_time == GST_CLOCK_TIME_NONE
+ || running_time > ctx->in_running_time))
+ ctx->in_running_time = running_time;
+ }
+
+ /* Try to make sure we have a valid running time */
+ if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
+ ctx->in_running_time =
+ gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
+ ctx->in_segment.start);
+ }
+
+ buf_info->run_ts = ctx->in_running_time;
+ buf_info->buf_size = gst_buffer_get_size (buf);
+
+ /* Update total input byte counter for overflow detect */
+ ctx->in_bytes += buf_info->buf_size;
+
+ GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
+ " total in_bytes %" G_GSIZE_FORMAT,
+ GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+
+ loop_again = TRUE;
+ do {
+ if (ctx->flushing)
+ break;
+
+ switch (splitmux->state) {
+ case SPLITMUX_STATE_COLLECTING_GOP_START:
+ if (ctx->is_video) {
+ /* If a keyframe, we have a complete GOP */
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
+ !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
+ splitmux->max_in_running_time >= ctx->in_running_time) {
+ /* Pass this buffer through */
+ loop_again = FALSE;
+ break;
+ }
+ GST_INFO_OBJECT (pad,
+ "Have keyframe with running time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (ctx->in_running_time));
+ keyframe = TRUE;
+ splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ splitmux->max_in_running_time = ctx->in_running_time;
+ /* Wake up other input pads to collect this GOP */
+ GST_SPLITMUX_BROADCAST (splitmux);
+ check_completed_gop (splitmux, ctx);
+ } else {
+ /* We're still waiting for a keyframe on the video pad, sleep */
+ GST_LOG_OBJECT (pad, "Sleeping for GOP start");
+ GST_SPLITMUX_WAIT (splitmux);
+ GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
+ splitmux->state);
+ }
+ break;
+ case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
+ /* After a GOP start is found, this buffer might complete the GOP */
+ /* If we overran the target timestamp, it might be time to process
+ * the GOP, otherwise bail out for more data
+ */
+ GST_LOG_OBJECT (pad,
+ "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (ctx->in_running_time),
+ GST_TIME_ARGS (splitmux->max_in_running_time));
+
+ if (ctx->in_running_time < splitmux->max_in_running_time) {
+ loop_again = FALSE;
+ break;
+ }
+
+ GST_LOG_OBJECT (pad,
+ "Collected last packet of GOP. Checking other pads");
+ check_completed_gop (splitmux, ctx);
+ break;
+ case SPLITMUX_STATE_ENDING_FILE:
+ case SPLITMUX_STATE_START_NEXT_FRAGMENT:
+ /* A fragment is ending, wait until that's done before continuing */
+ GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
+ GST_SPLITMUX_WAIT (splitmux);
+ GST_DEBUG_OBJECT (pad,
+ "Done sleeping for fragment restart state now %d", splitmux->state);
+ break;
+ default:
+ loop_again = FALSE;
+ break;
+ }
+ } while (loop_again);
+
+ if (keyframe) {
+ splitmux->queued_gops++;
+ buf_info->keyframe = TRUE;
+ }
+
+ /* Now add this buffer to the queue just before returning */
+ g_queue_push_head (&ctx->queued_bufs, buf_info);
+
+ /* Check the buffer will fit in the mq */
+ check_queue_length (splitmux, ctx);
+
+ GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
+ " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
+
+beach:
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ return GST_PAD_PROBE_PASS;
+}
+
+static GstPad *
+gst_splitmux_sink_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
+{
+ GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
+ GstPadTemplate *mux_template = NULL;
+ GstPad *res = NULL;
+ GstPad *mq_sink, *mq_src;
+ gchar *gname;
+ gboolean is_video = FALSE;
+ MqStreamCtx *ctx;
+
+ GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
+
+ GST_SPLITMUX_LOCK (splitmux);
+ if (!create_elements (splitmux))
+ goto fail;
+
+ if (templ->name_template) {
+ if (g_str_equal (templ->name_template, "video")) {
+ /* FIXME: Look for a pad template with matching caps, rather than by name */
+ mux_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+ (splitmux->muxer), "video_%u");
+ is_video = TRUE;
+ name = NULL;
+ } else {
+ mux_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+ (splitmux->muxer), templ->name_template);
+ }
+ }
+
+ res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
+ if (res == NULL)
+ goto fail;
+
+ if (is_video)
+ gname = g_strdup ("video");
+ else if (name == NULL)
+ gname = gst_pad_get_name (res);
+ else
+ gname = g_strdup (name);
+
+ if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
+ gst_element_release_request_pad (splitmux->muxer, res);
+ goto fail;
+ }
+
+ if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
+ gst_element_release_request_pad (splitmux->muxer, res);
+ gst_element_release_request_pad (splitmux->mq, mq_sink);
+ goto fail;
+ }
+
+ ctx = mq_stream_ctx_new (splitmux);
+ ctx->is_video = is_video;
+ ctx->srcpad = mq_src;
+ ctx->sinkpad = mq_sink;
+
+ mq_stream_ctx_ref (ctx);
+ ctx->src_pad_block_id =
+ gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
+ _pad_block_destroy_src_notify);
+ if (is_video)
+ splitmux->video_ctx = ctx;
+
+ res = gst_ghost_pad_new (gname, mq_sink);
+ g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
+
+ mq_stream_ctx_ref (ctx);
+ ctx->sink_pad_block_id =
+ gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
+ _pad_block_destroy_sink_notify);
+
+ GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
+ " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
+
+ splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
+
+ g_free (gname);
+
+ gst_object_unref (mq_sink);
+ gst_object_unref (mq_src);
+
+ gst_pad_set_active (res, TRUE);
+ gst_element_add_pad (element, res);
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ return res;
+fail:
+ GST_SPLITMUX_UNLOCK (splitmux);
+ return NULL;
+}
+
+static void
+gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
+{
+ GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
+ GstPad *mqsink, *mqsrc, *muxpad;
+ MqStreamCtx *ctx =
+ (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
+
+ GST_SPLITMUX_LOCK (splitmux);
+
+ if (splitmux->muxer == NULL || splitmux->mq == NULL)
+ goto fail; /* Elements don't exist yet - nothing to release */
+
+ GST_INFO_OBJECT (pad, "releasing request pad");
+
+ mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
+ mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
+ muxpad = gst_pad_get_peer (mqsrc);
+
+ /* Remove the context from our consideration */
+ splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
+
+ if (ctx->sink_pad_block_id)
+ gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
+
+ if (ctx->src_pad_block_id)
+ gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
+
+ /* Can release the context now */
+ mq_stream_ctx_unref (ctx);
+
+ /* Release and free the mq input */
+ gst_element_release_request_pad (splitmux->mq, mqsink);
+
+ /* Release and free the muxer input */
+ gst_element_release_request_pad (splitmux->muxer, muxpad);
+
+ gst_object_unref (mqsink);
+ gst_object_unref (mqsrc);
+ gst_object_unref (muxpad);
+
+ gst_element_remove_pad (element, pad);
+
+fail:
+ GST_SPLITMUX_UNLOCK (splitmux);
+}
+
+static GstElement *
+create_element (GstSplitMuxSink * splitmux,
+ const gchar * factory, const gchar * name)
+{
+ GstElement *ret = gst_element_factory_make (factory, name);
+ if (ret == NULL) {
+ g_warning ("Failed to create %s - splitmuxsink will not work", name);
+ return NULL;
+ }
+
+ if (!gst_bin_add (GST_BIN (splitmux), ret)) {
+ g_warning ("Could not add %s element - splitmuxsink will not work", name);
+ gst_object_unref (ret);
+ return NULL;
+ }
+
+ return ret;
+}
+
+static gboolean
+create_elements (GstSplitMuxSink * splitmux)
+{
+ /* Create internal elements */
+ if (splitmux->mq == NULL) {
+ if ((splitmux->mq =
+ create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
+ goto fail;
+
+ splitmux->mq_max_buffers = 5;
+ /* No bytes or time limit, we limit buffers manually */
+ g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
+ (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
+ }
+
+ if (splitmux->muxer == NULL) {
+ if (splitmux->provided_muxer == NULL) {
+ if ((splitmux->muxer =
+ create_element (splitmux, "mp4mux", "muxer")) == NULL)
+ goto fail;
+ } else {
+ splitmux->muxer = splitmux->provided_muxer;
+ if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
+ g_warning ("Could not add muxer element - splitmuxsink will not work");
+ goto fail;
+ }
+ }
+ }
+
+ return TRUE;
+fail:
+ return FALSE;
+}
+
+static GstElement *
+find_sink (GstElement * e)
+{
+ GstElement *res = NULL;
+ GstIterator *iter;
+ gboolean done = FALSE;
+ GValue data = { 0, };
+
+ if (!GST_IS_BIN (e))
+ return e;
+
+ iter = gst_bin_iterate_sinks (GST_BIN (e));
+ while (!done) {
+ switch (gst_iterator_next (iter, &data)) {
+ case GST_ITERATOR_OK:
+ {
+ GstElement *child = g_value_get_object (&data);
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
+ "location") != NULL) {
+ res = child;
+ done = TRUE;
+ }
+ g_value_reset (&data);
+ break;
+ }
+ case GST_ITERATOR_RESYNC:
+ gst_iterator_resync (iter);
+ break;
+ case GST_ITERATOR_DONE:
+ done = TRUE;
+ break;
+ case GST_ITERATOR_ERROR:
+ g_assert_not_reached ();
+ break;
+ }
+ }
+ g_value_unset (&data);
+ gst_iterator_free (iter);
+
+ return res;
+}
+
+static gboolean
+create_sink (GstSplitMuxSink * splitmux)
+{
+ g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
+
+ if (splitmux->provided_sink == NULL) {
+ if ((splitmux->sink =
+ create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
+ goto fail;
+ splitmux->active_sink = splitmux->sink;
+ } else {
+ if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
+ g_warning ("Could not add sink elements - splitmuxsink will not work");
+ goto fail;
+ }
+
+ splitmux->active_sink = splitmux->provided_sink;
+
+ /* Find the sink element */
+ splitmux->sink = find_sink (splitmux->active_sink);
+ if (splitmux->sink == NULL) {
+ g_warning
+ ("Could not locate sink element in provided sink - splitmuxsink will not work");
+ goto fail;
+ }
+ }
+
+ if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
+ g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
+ goto fail;
+ }
+
+ return TRUE;
+fail:
+ return FALSE;
+}
+
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wformat-nonliteral"
+#endif
+static void
+set_next_filename (GstSplitMuxSink * splitmux)
+{
+ if (splitmux->location) {
+ gchar *fname;
+
+ fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
+ GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
+ g_object_set (splitmux->sink, "location", fname, NULL);
+ g_free (fname);
+
+ splitmux->fragment_id++;
+ }
+}
+
+static GstStateChangeReturn
+gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret;
+ GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:{
+ GST_SPLITMUX_LOCK (splitmux);
+ if (!create_elements (splitmux) || !create_sink (splitmux)) {
+ ret = GST_STATE_CHANGE_FAILURE;
+ GST_SPLITMUX_UNLOCK (splitmux);
+ goto beach;
+ }
+ GST_SPLITMUX_UNLOCK (splitmux);
+ splitmux->fragment_id = 0;
+ set_next_filename (splitmux);
+ break;
+ }
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ GST_SPLITMUX_LOCK (splitmux);
+ /* Start by collecting one input on each pad */
+ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+ splitmux->max_in_running_time = 0;
+ splitmux->muxed_out_time = splitmux->mux_start_time = 0;
+ splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ }
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ GST_SPLITMUX_LOCK (splitmux);
+ splitmux->state = SPLITMUX_STATE_STOPPED;
+ /* Wake up any blocked threads */
+ GST_LOG_OBJECT (splitmux,
+ "State change -> NULL or READY. Waking threads");
+ GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ if (ret == GST_STATE_CHANGE_FAILURE)
+ goto beach;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ GST_SPLITMUX_LOCK (splitmux);
+ splitmux->fragment_id = 0;
+ gst_splitmux_reset (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ default:
+ break;
+ }
+
+beach:
+
+ if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
+ ret == GST_STATE_CHANGE_FAILURE) {
+ /* Cleanup elements on failed transition out of NULL */
+ gst_splitmux_reset (splitmux);
+ }
+ return ret;
+}
+
+gboolean
+register_splitmuxsink (GstPlugin * plugin)
+{
+ GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
+ "Split File Muxing Sink");
+
+ return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
+ GST_TYPE_SPLITMUX_SINK);
+}