summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago Santos <thiagoss@osg.samsung.com>2015-01-15 17:44:45 -0300
committerThiago Santos <thiagoss@osg.samsung.com>2015-01-16 15:00:11 -0300
commit941a26ab49e9df8776c4738976c497d644cce60b (patch)
treebb5d5988b267c791472fe7a03cc5bb0bd6e58999
parent207cb9aef8f70ddf6355fe5db796522cd3134645 (diff)
adaptivedemux: refactor chunk_received functionadaptivedemux
-rw-r--r--ext/dash/gstdashdemux.c185
-rw-r--r--ext/dash/gstdashdemux.h2
-rw-r--r--ext/hls/gsthlsdemux.c73
-rw-r--r--ext/hls/gsthlsdemux.h2
-rw-r--r--gst-libs/gst/adaptivedemux/gstadaptivedemux.c239
-rw-r--r--gst-libs/gst/adaptivedemux/gstadaptivedemux.h29
6 files changed, 277 insertions, 253 deletions
diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c
index d144df5e8..c5c7e43df 100644
--- a/ext/dash/gstdashdemux.c
+++ b/ext/dash/gstdashdemux.c
@@ -200,7 +200,7 @@ static GstFlowReturn gst_dash_demux_stream_seek (GstAdaptiveDemuxStream *
stream, GstClockTime ts);
static GstFlowReturn
gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream);
-static void
+static gboolean
gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream);
static gboolean gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream *
stream, guint64 bitrate);
@@ -213,8 +213,11 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream *
stream);
static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux);
static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux);
-static GstFlowReturn gst_dash_demux_chunk_received (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** chunk);
+static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
+static GstFlowReturn
+gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
/* GstDashDemux */
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
@@ -600,7 +603,8 @@ gst_dash_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
klass = GST_ADAPTIVE_DEMUX_GET_CLASS (dashdemux);
- klass->chunk_received = gst_dash_demux_chunk_received;
+ klass->data_received = gst_dash_demux_data_received;
+ klass->finish_fragment = gst_dash_demux_stream_fragment_finished;
}
if (gst_mpd_client_setup_media_presentation (dashdemux->client)) {
@@ -882,7 +886,7 @@ gst_dash_demux_stream_seek (GstAdaptiveDemuxStream * stream, GstClockTime ts)
return GST_FLOW_OK;
}
-static void
+static gboolean
gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream)
{
GstDashDemuxStream *dashstream = (GstDashDemuxStream *) stream;
@@ -905,6 +909,7 @@ gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream)
if (!fragment_finished) {
dashstream->sidx_current_remaining = sidx->entries[sidx->entry_index].size;
}
+ return !fragment_finished;
}
static GstFlowReturn
@@ -914,17 +919,8 @@ gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream)
GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (stream->demux);
if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
- GstSidxBox *sidx = SIDX (dashstream);
-
- if (stream->demux->segment.rate > 0.0) {
- if (sidx->entry_index < sidx->entries_count) {
- return GST_FLOW_OK;
- }
- } else {
- if (sidx->entry_index >= 0) {
- return GST_FLOW_OK;
- }
- }
+ if (gst_dash_demux_stream_advance_subfragment (stream))
+ return GST_FLOW_OK;
}
return gst_mpd_client_advance_segment (dashdemux->client,
@@ -986,22 +982,14 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream,
if (gst_mpd_client_has_isoff_ondemand_profile (demux->client)) {
- /* a new subsegment is going to start, cleanup any pending data from the
- * previous one */
+ /* store our current position to change to the same one in a different
+ * representation if needed */
dashstream->sidx_index = SIDX (dashstream)->entry_index;
- if (dashstream->pending_buffer) {
- gst_buffer_unref (dashstream->pending_buffer);
- dashstream->pending_buffer = NULL;
- }
-
if (ret) {
/* TODO cache indexes to avoid re-downloading and parsing */
/* if we switched, we need a new index */
gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
- } else {
- dashstream->sidx_current_remaining =
- SIDX_ENTRY (dashstream, dashstream->sidx_index)->size;
}
}
@@ -1071,11 +1059,6 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
}
-
- if (dashstream->pending_buffer) {
- gst_buffer_unref (dashstream->pending_buffer);
- dashstream->pending_buffer = NULL;
- }
gst_dash_demux_stream_seek (iter->data, target_pos);
}
return TRUE;
@@ -1247,88 +1230,112 @@ gst_dash_demux_advance_period (GstAdaptiveDemux * demux)
}
static GstBuffer *
-_gst_buffer_split (GstBuffer ** buffer, gint offset, gsize size)
+_gst_buffer_split (GstBuffer * buffer, gint offset, gsize size)
{
- GstBuffer *newbuf = gst_buffer_copy_region (*buffer,
+ GstBuffer *newbuf = gst_buffer_copy_region (buffer,
GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS | GST_BUFFER_COPY_META
| GST_BUFFER_COPY_MEMORY, offset, size - offset);
- gst_buffer_resize (*buffer, 0, offset);
+ gst_buffer_resize (buffer, 0, offset);
return newbuf;
}
static GstFlowReturn
-gst_dash_demux_chunk_received (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** chunk)
+gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
{
- GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
- GstFlowReturn ret = GST_FLOW_OK;
+ GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux);
- if (*chunk == NULL) {
- if (dash_stream->pending_buffer) {
- *chunk = dash_stream->pending_buffer;
- dash_stream->pending_buffer = NULL;
- }
+ if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
+ /* fragment is advanced on data_received when byte limits are reached */
return GST_FLOW_OK;
}
- if (dash_stream->pending_buffer) {
- *chunk = gst_buffer_append (dash_stream->pending_buffer, *chunk);
- dash_stream->pending_buffer = NULL;
- }
+ return gst_adaptive_demux_stream_advance_fragment (demux, stream,
+ stream->fragment.duration);
+}
+
+static GstFlowReturn
+gst_dash_demux_data_received (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
+{
+ GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBuffer *buffer;
+ gsize available;
- if (stream->downloading_index
- && dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) {
+ if (stream->downloading_index) {
GstIsoffParserResult res;
guint consumed;
- res =
- gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, *chunk,
- &consumed);
+ available = gst_adapter_available (stream->adapter);
+ buffer = gst_adapter_take_buffer (stream->adapter, available);
- if (res == GST_ISOFF_PARSER_ERROR) {
- } else if (res == GST_ISOFF_PARSER_UNEXPECTED) {
- /* this is not a 'sidx' index, just skip it and continue playback */
- } else {
- /* when finished, prepare for real data streaming */
- if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
- if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) {
- gst_dash_demux_stream_sidx_seek (dash_stream,
- dash_stream->pending_seek_ts);
- dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE;
- } else {
- SIDX (dash_stream)->entry_index = dash_stream->sidx_index;
+ if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) {
+ res =
+ gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, buffer,
+ &consumed);
+
+ if (res == GST_ISOFF_PARSER_ERROR) {
+ } else if (res == GST_ISOFF_PARSER_UNEXPECTED) {
+ /* this is not a 'sidx' index, just skip it and continue playback */
+ } else {
+ /* when finished, prepare for real data streaming */
+ if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
+ if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) {
+ gst_dash_demux_stream_sidx_seek (dash_stream,
+ dash_stream->pending_seek_ts);
+ dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE;
+ } else {
+ SIDX (dash_stream)->entry_index = dash_stream->sidx_index;
+ }
+ dash_stream->sidx_current_remaining =
+ SIDX_CURRENT_ENTRY (dash_stream)->size;
+ } else if (consumed < available) {
+ GstBuffer *pending;
+ /* we still need to keep some data around for the next parsing round
+ * so just push what was already processed by the parser */
+ pending = _gst_buffer_split (buffer, consumed, -1);
+ gst_adapter_push (stream->adapter, pending);
}
- dash_stream->sidx_current_remaining =
- SIDX_CURRENT_ENTRY (dash_stream)->size;
- } else if (consumed < gst_buffer_get_size (*chunk)) {
- dash_stream->pending_buffer = _gst_buffer_split (chunk, consumed, -1);
}
}
- }
+ ret = gst_adaptive_demux_stream_push_buffer (stream, buffer);
+ } else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
- /* check our position in subsegments */
- if (!stream->downloading_index
- && dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
- gsize size = gst_buffer_get_size (*chunk);
-
- GST_LOG_OBJECT (stream->pad,
- "Received buffer of size: %" G_GSIZE_FORMAT
- " - remaining in subsegment: %" G_GSIZE_FORMAT, size,
- dash_stream->sidx_current_remaining);
- if (size < dash_stream->sidx_current_remaining) {
- dash_stream->sidx_current_remaining -= size;
- } else if (size >= dash_stream->sidx_current_remaining) {
- if (size > dash_stream->sidx_current_remaining) {
- dash_stream->pending_buffer =
- _gst_buffer_split (chunk, dash_stream->sidx_current_remaining,
- size);
- }
+ while (ret == GST_FLOW_OK
+ && ((available = gst_adapter_available (stream->adapter)) > 0)) {
+ gboolean advance = FALSE;
- gst_dash_demux_stream_advance_subfragment (stream);
- ret = GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END;
+ if (available < dash_stream->sidx_current_remaining) {
+ buffer = gst_adapter_take_buffer (stream->adapter, available);
+ dash_stream->sidx_current_remaining -= available;
+ } else {
+ buffer =
+ gst_adapter_take_buffer (stream->adapter,
+ dash_stream->sidx_current_remaining);
+ dash_stream->sidx_current_remaining = 0;
+ advance = TRUE;
+ }
+ ret = gst_adaptive_demux_stream_push_buffer (stream, buffer);
+ if (advance) {
+ GstFlowReturn new_ret;
+ new_ret =
+ gst_adaptive_demux_stream_advance_fragment (demux, stream,
+ SIDX_CURRENT_ENTRY (dash_stream)->duration);
+
+ /* only overwrite if it was OK before */
+ if (ret == GST_FLOW_OK)
+ ret = new_ret;
+ }
}
+ } else {
+ /* this should be the main header, just push it all */
+ ret =
+ gst_adaptive_demux_stream_push_buffer (stream,
+ gst_adapter_take_buffer (stream->adapter,
+ gst_adapter_available (stream->adapter)));
}
return ret;
@@ -1340,6 +1347,4 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream)
GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser);
- if (dash_stream->pending_buffer)
- gst_buffer_unref (dash_stream->pending_buffer);
}
diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h
index fb1d81dc9..7abd073ec 100644
--- a/ext/dash/gstdashdemux.h
+++ b/ext/dash/gstdashdemux.h
@@ -65,8 +65,6 @@ struct _GstDashDemuxStream
GstMediaFragmentInfo current_fragment;
- GstBuffer *pending_buffer;
-
/* index parsing */
GstSidxParser sidx_parser;
gsize sidx_current_remaining;
diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c
index c4e589dba..934bebabb 100644
--- a/ext/hls/gsthlsdemux.c
+++ b/ext/hls/gsthlsdemux.c
@@ -111,10 +111,10 @@ static gboolean gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek);
static gboolean
gst_hls_demux_start_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
-static void gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
-static GstFlowReturn gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** chunk);
+static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
+static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream *
stream);
static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream *
@@ -216,7 +216,7 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass)
adaptivedemux_class->start_fragment = gst_hls_demux_start_fragment;
adaptivedemux_class->finish_fragment = gst_hls_demux_finish_fragment;
- adaptivedemux_class->chunk_received = gst_hls_demux_chunk_received;
+ adaptivedemux_class->data_received = gst_hls_demux_data_received;
GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
"hlsdemux element");
@@ -289,9 +289,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
gst_uri_downloader_reset (demux->downloader);
break;
- case GST_STATE_CHANGE_NULL_TO_READY:
- demux->adapter = gst_adapter_new ();
- break;
default:
break;
}
@@ -302,10 +299,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
break;
- case GST_STATE_CHANGE_READY_TO_NULL:
- gst_object_unref (demux->adapter);
- demux->adapter = NULL;
- break;
default:
break;
}
@@ -364,11 +357,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
/* properly cleanup pending decryption status */
if (flags & GST_SEEK_FLAG_FLUSH) {
- if (hlsdemux->adapter)
- gst_adapter_clear (hlsdemux->adapter);
- if (hlsdemux->pending_buffer)
- gst_buffer_unref (hlsdemux->pending_buffer);
- hlsdemux->pending_buffer = NULL;
gst_hls_demux_decrypt_end (hlsdemux);
}
@@ -593,9 +581,9 @@ key_failed:
return FALSE;
}
-static void
+static GstFlowReturn
gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** buffer)
+ GstAdaptiveDemuxStream * stream)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
@@ -605,8 +593,8 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
/* ideally this should be empty, but this eos might have been
* caused by an error on the source element */
GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received"
- ": %" G_GSIZE_FORMAT, gst_adapter_available (hlsdemux->adapter));
- gst_adapter_clear (hlsdemux->adapter);
+ ": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter));
+ gst_adapter_clear (stream->adapter);
/* pending buffer is only used for encrypted streams */
if (stream->last_ret == GST_FLOW_OK) {
@@ -621,40 +609,41 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size);
- *buffer = hlsdemux->pending_buffer;
- hlsdemux->pending_buffer = NULL;
+ gst_adaptive_demux_stream_push_buffer (stream, hlsdemux->pending_buffer);
}
} else {
if (hlsdemux->pending_buffer)
gst_buffer_unref (hlsdemux->pending_buffer);
hlsdemux->pending_buffer = NULL;
}
+
+ return gst_adaptive_demux_stream_advance_fragment (demux, stream,
+ stream->fragment.duration);
}
static GstFlowReturn
-gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream, GstBuffer ** chunk)
+gst_hls_demux_data_received (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
- GstBuffer *buffer = *chunk;
+ gsize available;
+ GstBuffer *buffer = NULL;
+
+ available = gst_adapter_available (stream->adapter);
/* Is it encrypted? */
if (hlsdemux->current_key) {
GError *err = NULL;
GstBuffer *tmp_buffer;
- gsize available;
-
- gst_adapter_push (hlsdemux->adapter, buffer);
- *chunk = NULL;
/* must be a multiple of 16 */
- available = gst_adapter_available (hlsdemux->adapter) & (~0xF);
+ available = available & (~0xF);
if (available == 0) {
return GST_FLOW_OK;
}
- buffer = gst_adapter_take_buffer (hlsdemux->adapter, available);
+ buffer = gst_adapter_take_buffer (stream->adapter, available);
buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err);
if (buffer == NULL) {
GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"),
@@ -665,14 +654,15 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
tmp_buffer = hlsdemux->pending_buffer;
hlsdemux->pending_buffer = buffer;
- *chunk = tmp_buffer;
- } else if (hlsdemux->pending_buffer) {
- *chunk = gst_buffer_append (hlsdemux->pending_buffer, buffer);
- hlsdemux->pending_buffer = NULL;
+ buffer = tmp_buffer;
+ } else {
+ buffer = gst_adapter_take_buffer (stream->adapter, available);
+ if (hlsdemux->pending_buffer) {
+ buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer);
+ hlsdemux->pending_buffer = NULL;
+ }
}
- buffer = *chunk;
-
if (G_UNLIKELY (hlsdemux->do_typefind && buffer != NULL)) {
GstCaps *caps = NULL;
GstMapInfo info;
@@ -697,6 +687,7 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
if (buffer_size > (2 * 1024 * 1024)) {
GST_ELEMENT_ERROR (hlsdemux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
+ gst_buffer_unref (buffer);
return GST_FLOW_NOT_NEGOTIATED;
} else {
if (hlsdemux->pending_buffer)
@@ -704,7 +695,6 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
gst_buffer_append (buffer, hlsdemux->pending_buffer);
else
hlsdemux->pending_buffer = buffer;
- *chunk = NULL;
return GST_FLOW_OK;
}
}
@@ -722,6 +712,9 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
hlsdemux->do_typefind = FALSE;
}
+ if (buffer) {
+ return gst_adaptive_demux_stream_push_buffer (stream, buffer);
+ }
return GST_FLOW_OK;
}
@@ -842,8 +835,6 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
demux->client = gst_m3u8_client_new ("", NULL);
demux->srcpad_counter = 0;
- if (demux->adapter)
- gst_adapter_clear (demux->adapter);
if (demux->pending_buffer)
gst_buffer_unref (demux->pending_buffer);
demux->pending_buffer = NULL;
diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h
index 20fe98fe0..c76cd0e56 100644
--- a/ext/hls/gsthlsdemux.h
+++ b/ext/hls/gsthlsdemux.h
@@ -25,7 +25,6 @@
#define __GST_HLS_DEMUX_H__
#include <gst/gst.h>
-#include <gst/base/gstadapter.h>
#include "m3u8.h"
#include "gstfragmented.h"
#include <gst/uridownloader/gsturidownloader.h>
@@ -96,7 +95,6 @@ struct _GstHLSDemux
#endif
gchar *current_key;
guint8 *current_iv;
- GstAdapter *adapter; /* used to accumulate 16 bytes multiple chunks */
GstBuffer *pending_buffer; /* decryption scenario:
* the last buffer can only be pushed when
* resized, so need to store and wait for
diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
index b09c53e1e..70f10d105 100644
--- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
+++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
@@ -87,6 +87,11 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug);
#define MAX_DOWNLOAD_ERROR_COUNT 3
#define DEFAULT_FAILED_COUNT 3
+enum GstAdaptiveDemuxFlowReturn
+{
+ GST_ADAPTIVE_DEMUX_FLOW_SWITCH = GST_FLOW_CUSTOM_SUCCESS_2 + 1
+};
+
struct _GstAdaptiveDemuxPrivate
{
GstAdapter *input_adapter;
@@ -141,9 +146,6 @@ static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime ts);
static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
demux, GstAdaptiveDemuxStream * stream);
-static GstFlowReturn
-gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream);
static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
static GstFlowReturn
@@ -171,6 +173,12 @@ static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
static void
gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
stream, GstFlowReturn ret, GError * err);
+static GstFlowReturn
+gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
+static GstFlowReturn
+gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream);
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@@ -223,6 +231,9 @@ gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
gstelement_class->change_state = gst_adaptive_demux_change_state;
gstbin_class->handle_message = gst_adaptive_demux_handle_message;
+
+ klass->data_received = gst_adaptive_demux_stream_data_received_default;
+ klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
}
static void
@@ -704,6 +715,7 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
gst_segment_init (&stream->segment, GST_FORMAT_TIME);
g_cond_init (&stream->fragment_download_cond);
g_mutex_init (&stream->fragment_download_lock);
+ stream->adapter = gst_adapter_new ();
demux->next_streams = g_list_append (demux->next_streams, stream);
@@ -763,6 +775,9 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
}
if (stream->pending_caps)
gst_caps_unref (stream->pending_caps);
+
+ g_object_unref (stream->adapter);
+
g_free (stream);
}
@@ -1110,6 +1125,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
gst_task_join (stream->download_task);
stream->download_error_count = 0;
stream->need_header = TRUE;
+ gst_adapter_clear (stream->adapter);
}
gst_task_join (demux->priv->updates_task);
}
@@ -1202,54 +1218,13 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
return GST_FLOW_OK;
}
-static GstFlowReturn
-_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+GstFlowReturn
+gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
+ GstBuffer * buffer)
{
- GstPad *srcpad = (GstPad *) parent;
- GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad);
GstAdaptiveDemux *demux = stream->demux;
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
GstFlowReturn ret = GST_FLOW_OK;
gboolean discont = FALSE;
- gboolean subsegment_end = FALSE;
-
- if (stream->starting_fragment) {
- stream->starting_fragment = FALSE;
- if (klass->start_fragment) {
- klass->start_fragment (demux, stream);
- }
-
- GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
-
- GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
- GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
-
- if (GST_BUFFER_PTS_IS_VALID (buffer))
- stream->segment.position = GST_BUFFER_PTS (buffer);
-
- } else {
- GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
- }
-
- /* The subclass might need to decrypt or modify the buffer somehow
- * before processing it */
- if (klass->chunk_received) {
- ret = klass->chunk_received (demux, stream, &buffer);
- if (ret != GST_FLOW_OK) {
- if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END) {
- ret = GST_FLOW_OK;
- subsegment_end = TRUE;
- } else {
- if (buffer)
- gst_buffer_unref (buffer);
- gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
- return ret;
- }
- }
- }
-
- if (buffer == NULL)
- return ret;
if (stream->first_fragment_buffer) {
if (demux->segment.rate < 0)
@@ -1310,9 +1285,65 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
stream->pending_tags = NULL;
}
- ret = gst_proxy_pad_chain_default (pad, parent, buffer);
+ ret = gst_pad_push (stream->pad, buffer);
stream->download_start_time = g_get_monotonic_time ();
- GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret));
+ GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
+ gst_flow_get_name (ret));
+
+ return ret;
+}
+
+static GstFlowReturn
+gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
+{
+ return gst_adaptive_demux_stream_advance_fragment (demux, stream,
+ stream->fragment.duration);
+}
+
+static GstFlowReturn
+gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
+{
+ GstBuffer *buffer;
+
+ buffer = gst_adapter_take_buffer (stream->adapter,
+ gst_adapter_available (stream->adapter));
+ return gst_adaptive_demux_stream_push_buffer (stream, buffer);
+}
+
+static GstFlowReturn
+_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstPad *srcpad = (GstPad *) parent;
+ GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad);
+ GstAdaptiveDemux *demux = stream->demux;
+ GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ if (stream->starting_fragment) {
+ stream->starting_fragment = FALSE;
+ if (klass->start_fragment) {
+ klass->start_fragment (demux, stream);
+ }
+
+ GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
+
+ GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
+
+ if (GST_BUFFER_PTS_IS_VALID (buffer))
+ stream->segment.position = GST_BUFFER_PTS (buffer);
+
+ } else {
+ GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
+ }
+
+ gst_adapter_push (stream->adapter, buffer);
+ GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT
+ ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer),
+ gst_adapter_available (stream->adapter));
+ ret = klass->data_received (demux, stream);
if (ret != GST_FLOW_OK) {
if (ret < GST_FLOW_EOS) {
@@ -1327,34 +1358,13 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
- } else if (subsegment_end) {
- /* tell upstream that we are done here */
- ret = GST_FLOW_EOS;
+ if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH)
+ ret = GST_FLOW_EOS; /* return EOS to make the source stop */
}
return ret;
}
-static GstFlowReturn
-gst_adaptive_demux_stream_fragment_eos (GstAdaptiveDemuxStream * stream)
-{
- GstAdaptiveDemux *demux = stream->demux;
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
- GstBuffer *buffer = NULL;
- GstFlowReturn ret = GST_FLOW_OK;
-
- if (klass->finish_fragment) {
- klass->finish_fragment (demux, stream, &buffer);
- if (buffer) {
- stream->download_total_time +=
- g_get_monotonic_time () - stream->download_start_time;
- stream->download_total_bytes += gst_buffer_get_size (buffer);
- ret = gst_pad_push (stream->pad, buffer);
- }
- }
- return ret;
-}
-
static void
gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
stream, GstFlowReturn ret, GError * err)
@@ -1386,9 +1396,11 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:{
+ GstAdaptiveDemuxClass *klass;
GstFlowReturn ret;
- ret = gst_adaptive_demux_stream_fragment_eos (stream);
+ klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
+ ret = klass->finish_fragment (stream->demux, stream);
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
break;
}
@@ -1758,8 +1770,6 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
}
}
-
-
return ret;
no_url_error:
@@ -1932,34 +1942,6 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
GST_MANIFEST_LOCK (demux);
}
- if (ret == GST_FLOW_OK) {
- stream->download_error_count = 0;
- g_clear_error (&stream->last_error);
- if (GST_CLOCK_TIME_IS_VALID (stream->fragment.duration))
- stream->segment.position += stream->fragment.duration;
- ret = gst_adaptive_demux_stream_advance_fragment (demux, stream);
-
- if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
- gst_adaptive_demux_stream_update_current_bitrate (stream))) {
- stream->need_header = TRUE;
- }
-
- /* the subclass might want to switch pads */
- if (G_UNLIKELY (demux->next_streams)) {
- gst_task_stop (stream->download_task);
- /* TODO only allow switching streams if other downloads are not ongoing */
- GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
- "to do bitrate switching");
- gst_adaptive_demux_expose_streams (demux, FALSE);
- gst_adaptive_demux_start_tasks (demux);
- ret = GST_FLOW_EOS;
- GST_MANIFEST_UNLOCK (demux);
- goto end_of_manifest;
- }
- }
-
-
- stream->last_ret = ret;
switch (ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
@@ -2229,15 +2211,60 @@ gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
return ret;
}
-static GstFlowReturn
+GstFlowReturn
gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
- GstAdaptiveDemuxStream * stream)
+ GstAdaptiveDemuxStream * stream, GstClockTime duration)
+{
+ GstFlowReturn ret;
+
+ GST_MANIFEST_LOCK (demux);
+ if (stream->last_ret == GST_FLOW_OK) {
+ stream->last_ret =
+ gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
+ duration);
+ }
+ ret = stream->last_ret;
+ GST_MANIFEST_UNLOCK (demux);
+
+ return ret;
+}
+
+GstFlowReturn
+gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream, GstClockTime duration)
{
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstFlowReturn ret;
g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
- return klass->stream_advance_fragment (stream);
+ stream->download_error_count = 0;
+ g_clear_error (&stream->last_error);
+ if (GST_CLOCK_TIME_IS_VALID (duration))
+ stream->segment.position += duration;
+ ret = klass->stream_advance_fragment (stream);
+
+ if (ret == GST_FLOW_OK) {
+ if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
+ gst_adaptive_demux_stream_update_current_bitrate (stream))) {
+ stream->need_header = TRUE;
+ gst_adapter_clear (stream->adapter);
+ ret = GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
+ }
+
+ /* the subclass might want to switch pads */
+ if (G_UNLIKELY (demux->next_streams)) {
+ gst_task_stop (stream->download_task);
+ /* TODO only allow switching streams if other downloads are not ongoing */
+ GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
+ "to do bitrate switching");
+ gst_adaptive_demux_expose_streams (demux, FALSE);
+ gst_adaptive_demux_start_tasks (demux);
+ ret = GST_FLOW_EOS;
+ }
+ }
+
+ return ret;
}
static gboolean
diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h
index d650fe47c..2f28cdcca 100644
--- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h
+++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h
@@ -115,6 +115,8 @@ struct _GstAdaptiveDemuxStream
GstSegment segment;
+ GstAdapter *adapter;
+
GstCaps *pending_caps;
GstEvent *pending_segment;
GstTagList *pending_tags;
@@ -335,28 +337,23 @@ struct _GstAdaptiveDemuxClass
* finish_fragment:
* @demux: #GstAdaptiveDemux
* @stream: #GstAdaptiveDemuxStream
- * @buffer: Pointer to store and pending data that should be pushed.
*
* Notifies the subclass that a fragment download was finished.
- * It can be used to cleanup internal state after a fragment and also
- * provides a pointer for the subclass to return some pending data
- * that should be pushed before starting the next fragment. This
- * covers the use case of finishing the decryption of the last chunk
- * of an encrypted fragment.
+ * It can be used to cleanup internal state after a fragment and
+ * also push any pending data before moving to the next fragment.
*/
- void (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
+ GstFlowReturn (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream);
/**
- * chunk_received:
+ * data_received:
* @demux: #GstAdaptiveDemux
* @stream: #GstAdaptiveDemuxStream
- * @buffer: Pointer containing the received chunk, also used to return modified data
*
- * Notifies the subclass that a fragment chunk was downloaded. The subclass can
- * modify the buffer and return a new one if needed. Used for decryption.
+ * Notifies the subclass that a fragment chunk was downloaded. The subclass
+ * can look at the data at the adapter and modify/push data as desired.
*
* Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error.
*/
- GstFlowReturn (*chunk_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
+ GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream);
/**
* get_live_seek_range:
@@ -385,6 +382,14 @@ void gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
GstTagList * tags);
void gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f);
+GstFlowReturn gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
+GstFlowReturn
+gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream, GstClockTime duration);
+GstFlowReturn
+gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream, GstClockTime duration);
+
G_END_DECLS
#endif