diff options
-rw-r--r-- | gst/mpegtsdemux/mpegtsbase.c | 101 | ||||
-rw-r--r-- | gst/mpegtsdemux/mpegtsbase.h | 9 | ||||
-rw-r--r-- | gst/mpegtsdemux/mpegtspacketizer.c | 18 | ||||
-rw-r--r-- | gst/mpegtsdemux/mpegtspacketizer.h | 1 | ||||
-rw-r--r-- | gst/mpegtsdemux/tsdemux.c | 709 | ||||
-rw-r--r-- | gst/mpegtsdemux/tsdemux.h | 17 |
6 files changed, 805 insertions, 50 deletions
diff --git a/gst/mpegtsdemux/mpegtsbase.c b/gst/mpegtsdemux/mpegtsbase.c index f64b588f1..c7d258796 100644 --- a/gst/mpegtsdemux/mpegtsbase.c +++ b/gst/mpegtsdemux/mpegtsbase.c @@ -187,6 +187,8 @@ mpegts_base_class_init (MpegTSBaseClass * klass) static void mpegts_base_reset (MpegTSBase * base) { + MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base); + mpegts_packetizer_clear (base->packetizer); memset (base->is_pes, 0, 8192); memset (base->known_psi, 0, 8192); @@ -206,6 +208,8 @@ mpegts_base_reset (MpegTSBase * base) /* base->pat = NULL; */ /* pmt pids will be added and removed dynamically */ + if (klass->reset) + klass->reset (base); } static void @@ -1013,8 +1017,8 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event) gst_event_unref (event); res = FALSE; break; - case GST_EVENT_FLUSH_STOP: - mpegts_packetizer_clear (base->packetizer); + case GST_EVENT_FLUSH_START: + mpegts_packetizer_flush (base->packetizer); /* Passthrough */ default: res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event); @@ -1181,6 +1185,9 @@ mpegts_base_loop (MpegTSBase * base) goto error; } break; + case BASE_MODE_PUSHING: + GST_WARNING ("wrong BASE_MODE_PUSHING mode in pull loop"); + break; } return; @@ -1201,6 +1208,92 @@ error: } } + +gboolean +mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad, + GstEvent * event) +{ + MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base); + GstFlowReturn ret = GST_FLOW_ERROR; + gdouble rate; + gboolean flush; + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + if (format != GST_FORMAT_TIME) + return FALSE; + + GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT + " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), + GST_TIME_ARGS (stop)); + + flush = flags & GST_SEEK_FLAG_FLUSH; + + if (base->mode == BASE_MODE_PUSHING) { + GST_ERROR ("seeking in push mode not supported"); + goto done; + } + + /* stop streaming, either by flushing or by pausing the task */ + base->mode = BASE_MODE_SEEKING; + if (flush) { + GST_DEBUG_OBJECT (base, "sending flush start"); + gst_pad_push_event (base->sinkpad, gst_event_new_flush_start ()); + GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, + gst_event_new_flush_start ()); + } else + gst_pad_pause_task (base->sinkpad); + /* wait for streaming to finish */ + GST_PAD_STREAM_LOCK (base->sinkpad); + + if (flush) { + /* send a FLUSH_STOP for the sinkpad, since we need data for seeking */ + GST_DEBUG_OBJECT (base, "sending flush stop"); + gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ()); + } + + if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT | + GST_SEEK_FLAG_SKIP)) { + GST_WARNING ("seek flags 0x%x are not supported", (int) flags); + goto done; + } + + + if (format == GST_FORMAT_TIME) { + /* If the subclass can seek, do that */ + if (klass->seek) { + ret = klass->seek (base, event); + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + GST_WARNING ("seeking failed %s", gst_flow_get_name (ret)); + goto done; + } + } else { + GST_WARNING ("subclass has no seek implementation"); + goto done; + } + } + + if (flush) { + /* if we sent a FLUSH_START, we now send a FLUSH_STOP */ + GST_DEBUG_OBJECT (base, "sending flush stop"); + //gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ()); + GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, + gst_event_new_flush_stop ()); + } + //else +done: + gst_pad_start_task (base->sinkpad, (GstTaskFunction) mpegts_base_loop, base); + + GST_PAD_STREAM_UNLOCK (base->sinkpad); + return ret == GST_FLOW_OK; +} + + static gboolean mpegts_base_sink_activate (GstPad * pad) { @@ -1227,6 +1320,8 @@ mpegts_base_sink_activate_pull (GstPad * pad, gboolean active) static gboolean mpegts_base_sink_activate_push (GstPad * pad, gboolean active) { + MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad)); + base->mode = BASE_MODE_PUSHING; return TRUE; } @@ -1243,6 +1338,8 @@ mpegts_base_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: mpegts_base_reset (base); + if (base->mode != BASE_MODE_PUSHING) + base->mode = BASE_MODE_SCANNING; break; default: break; diff --git a/gst/mpegtsdemux/mpegtsbase.h b/gst/mpegtsdemux/mpegtsbase.h index 168f2ff1c..01de54e03 100644 --- a/gst/mpegtsdemux/mpegtsbase.h +++ b/gst/mpegtsdemux/mpegtsbase.h @@ -74,7 +74,8 @@ struct _MpegTSBaseProgram typedef enum { BASE_MODE_SCANNING, BASE_MODE_SEEKING, - BASE_MODE_STREAMING + BASE_MODE_STREAMING, + BASE_MODE_PUSHING } MpegTSBaseMode; struct _MpegTSBase { @@ -124,6 +125,7 @@ struct _MpegTSBaseClass { GstElementClass parent_class; /* Virtual methods */ + void (*reset) (MpegTSBase *base); GstFlowReturn (*push) (MpegTSBase *base, MpegTSPacketizerPacket *packet, MpegTSPacketizerSection * section); gboolean (*push_event) (MpegTSBase *base, GstEvent * event); /* program_started gets called when program's pmt arrives for first time */ @@ -139,6 +141,9 @@ struct _MpegTSBaseClass { /* find_timestamps is called to find PCR */ GstFlowReturn (*find_timestamps) (MpegTSBase * base, guint64 initoff, guint64 *offset); + /* seek is called to wait for seeking */ + GstFlowReturn (*seek) (MpegTSBase * base, GstEvent * event); + /* signals */ void (*pat_info) (GstStructure *pat); void (*pmt_info) (GstStructure *pmt); @@ -155,6 +160,8 @@ MpegTSBaseProgram *mpegts_base_add_program (MpegTSBase * base, gint program_numb guint8 *mpegts_get_descriptor_from_stream (MpegTSBaseStream * stream, guint8 tag); guint8 *mpegts_get_descriptor_from_program (MpegTSBaseProgram * program, guint8 tag); +gboolean +mpegts_base_handle_seek_event(MpegTSBase * base, GstPad * pad, GstEvent * event); gboolean gst_mpegtsbase_plugin_init (GstPlugin * plugin); diff --git a/gst/mpegtsdemux/mpegtspacketizer.c b/gst/mpegtsdemux/mpegtspacketizer.c index 6739c0af3..461690642 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.c +++ b/gst/mpegtsdemux/mpegtspacketizer.c @@ -2087,6 +2087,24 @@ mpegts_packetizer_clear (MpegTSPacketizer2 * packetizer) } void +mpegts_packetizer_flush (MpegTSPacketizer2 * packetizer) +{ + if (packetizer->streams) { + int i; + for (i = 0; i < 8192; i++) { + if (packetizer->streams[i]) { + gst_adapter_flush (packetizer->streams[i]->section_adapter, + packetizer->streams[i]->section_adapter->size); + } + } + } + gst_adapter_flush (packetizer->adapter, packetizer->adapter->size); + + packetizer->offset = 0; + packetizer->empty = TRUE; +} + +void mpegts_packetizer_remove_stream (MpegTSPacketizer2 * packetizer, gint16 pid) { MpegTSPacketizerStream *stream = packetizer->streams[pid]; diff --git a/gst/mpegtsdemux/mpegtspacketizer.h b/gst/mpegtsdemux/mpegtspacketizer.h index f40189b60..832862577 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.h +++ b/gst/mpegtsdemux/mpegtspacketizer.h @@ -138,6 +138,7 @@ GType mpegts_packetizer_get_type(void); MpegTSPacketizer2 *mpegts_packetizer_new (void); void mpegts_packetizer_clear (MpegTSPacketizer2 *packetizer); +void mpegts_packetizer_flush (MpegTSPacketizer2 *packetizer); void mpegts_packetizer_push (MpegTSPacketizer2 *packetizer, GstBuffer *buffer); gboolean mpegts_packetizer_has_packets (MpegTSPacketizer2 *packetizer); MpegTSPacketizerPacketReturn mpegts_packetizer_next_packet (MpegTSPacketizer2 *packetizer, diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c index fe7fb02ba..3c1866320 100644 --- a/gst/mpegtsdemux/tsdemux.c +++ b/gst/mpegtsdemux/tsdemux.c @@ -30,6 +30,8 @@ #include <stdlib.h> #include <string.h> +#include <glib.h> + #include "mpegtsbase.h" #include "tsdemux.h" #include "gstmpegdesc.h" @@ -44,6 +46,12 @@ /* Size of the pendingbuffers array. */ #define TS_MAX_PENDING_BUFFERS 256 +#define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024)) +/* small PCR for wrap detection */ +#define PCR_SMALL 17775000 +/* maximal PCR time */ +#define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298) + GST_DEBUG_CATEGORY_STATIC (ts_demux_debug); #define GST_CAT_DEFAULT ts_demux_debug @@ -173,6 +181,7 @@ static void gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program); static void gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program); +static void gst_ts_demux_reset (MpegTSBase * base); static GstFlowReturn gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, MpegTSPacketizerSection * section); @@ -181,6 +190,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream, MpegTSBaseProgram * program); static void gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream); +static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event); +static GstFlowReturn +find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length, + TSPcrOffset * pcroffset); static GstFlowReturn find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset); static void gst_ts_demux_set_property (GObject * object, guint prop_id, @@ -189,8 +202,9 @@ static void gst_ts_demux_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_ts_demux_finalize (GObject * object); static GstFlowReturn -process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr, +process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset, guint numpcr, gboolean isinitial); +static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux); static gboolean push_event (MpegTSBase * base, GstEvent * event); static void _extra_init (GType type); @@ -254,6 +268,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass) ts_class = GST_MPEGTS_BASE_CLASS (klass); + ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset); ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push); ts_class->push_event = GST_DEBUG_FUNCPTR (push_event); ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started); @@ -261,6 +276,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass) ts_class->stream_added = gst_ts_demux_stream_added; ts_class->stream_removed = gst_ts_demux_stream_removed; ts_class->find_timestamps = GST_DEBUG_FUNCPTR (find_timestamps); + ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek); } static void @@ -270,6 +286,32 @@ gst_ts_demux_init (GstTSDemux * demux, GstTSDemuxClass * klass) demux->program_number = -1; demux->duration = GST_CLOCK_TIME_NONE; GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream); + gst_segment_init (&demux->segment, GST_FORMAT_TIME); + demux->first_pcr = (TSPcrOffset) { + GST_CLOCK_TIME_NONE, 0, 0}; + demux->cur_pcr = (TSPcrOffset) { + 0}; + demux->last_pcr = (TSPcrOffset) { + 0}; +} + +static void +gst_ts_demux_reset (MpegTSBase * base) +{ + GstTSDemux *demux = (GstTSDemux *) base; + g_array_free (demux->index, TRUE); + demux->index = NULL; + demux->index_size = 0; + demux->need_newsegment = TRUE; + demux->program_number = -1; + demux->duration = GST_CLOCK_TIME_NONE; + gst_segment_init (&demux->segment, GST_FORMAT_TIME); + demux->first_pcr = (TSPcrOffset) { + GST_CLOCK_TIME_NONE, 0, 0}; + demux->cur_pcr = (TSPcrOffset) { + 0}; + demux->last_pcr = (TSPcrOffset) { + 0}; } static void @@ -324,6 +366,7 @@ gst_ts_demux_srcpad_query_types (GstPad * pad) { static const GstQueryType query_types[] = { GST_QUERY_DURATION, + GST_QUERY_SEEKING, 0 }; @@ -334,40 +377,327 @@ static gboolean gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query) { gboolean res = TRUE; + GstFormat format; GstTSDemux *demux; demux = GST_TS_DEMUX (gst_pad_get_parent (pad)); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_DURATION: - { - GstFormat format; - + GST_DEBUG ("query duration"); gst_query_parse_duration (query, &format, NULL); - /* can only get position in time */ - if (format != GST_FORMAT_TIME) - goto wrong_format; - - gst_query_set_duration (query, GST_FORMAT_TIME, demux->duration); + if (format == GST_FORMAT_TIME) { + gst_query_set_duration (query, GST_FORMAT_TIME, + demux->segment.duration); + } else { + GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported"); + res = FALSE; + } + break; + case GST_QUERY_SEEKING: + GST_DEBUG ("query seeking"); + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + if (format == GST_FORMAT_TIME) { + gst_query_set_seeking (query, GST_FORMAT_TIME, + demux->parent.mode != BASE_MODE_PUSHING, 0, + demux->segment.duration); + } else { + GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking"); + res = FALSE; + } break; - } default: res = gst_pad_query_default (pad, query); - break; } -done: gst_object_unref (demux); return res; -wrong_format: - { - GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported"); - res = FALSE; +} + +static inline GstClockTime +calculate_gsttime (TSPcrOffset * start, guint64 pcr) +{ + + GstClockTime time = start->gsttime; + + if (start->pcr > pcr) + time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE - start->pcr) + + PCRTIME_TO_GSTTIME (pcr); + else + time += PCRTIME_TO_GSTTIME (pcr - start->pcr); + + return time; +} + + +static gint +TSPcrOffset_find (gconstpointer a, gconstpointer b, gpointer user_data) +{ + +/* GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */ +/* GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */ +/* GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */ +/* GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */ + + if (((TSPcrOffset *) a)->gsttime < ((TSPcrOffset *) b)->gsttime) + return -1; + else if (((TSPcrOffset *) a)->gsttime > ((TSPcrOffset *) b)->gsttime) + return 1; + else + return 0; +} + +static GstFlowReturn +gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment) +{ + GstTSDemux *demux = (GstTSDemux *) base; + GstFlowReturn res = GST_FLOW_ERROR; + int loop_cnt = 0; + double bias = 1.0; + gint64 desired_offset; + gint64 seekpos = 0; + gint64 time_diff; + GstClockTime seektime; + TSPcrOffset seekpcroffset, pcr_start, pcr_stop, *tmp; + + desired_offset = segment->last_stop; + + seektime = desired_offset + demux->first_pcr.gsttime; + seekpcroffset.gsttime = seektime; + + GST_DEBUG ("seeking to %" GST_TIME_FORMAT, GST_TIME_ARGS (seektime)); + + gst_ts_demux_flush_streams (demux); + + if (G_UNLIKELY (!demux->index)) { + GST_ERROR ("no index"); + goto done; + } + + /* get the first index entry before the seek position */ + tmp = gst_util_array_binary_search (demux->index->data, demux->index_size, + sizeof (*tmp), TSPcrOffset_find, GST_SEARCH_MODE_BEFORE, &seekpcroffset, + NULL); + + if (G_UNLIKELY (!tmp)) { + GST_ERROR ("value not found"); + goto done; + } + + pcr_start = *tmp; + pcr_stop = *(++tmp); + + if (G_UNLIKELY (!pcr_stop.offset)) { + GST_ERROR ("invalid entry"); goto done; } + + /* check if the last recorded pcr can be used */ + if (pcr_start.offset < demux->cur_pcr.offset + && demux->cur_pcr.offset < pcr_stop.offset) { + demux->cur_pcr.gsttime = calculate_gsttime (&pcr_start, demux->cur_pcr.pcr); + if (demux->cur_pcr.gsttime < seekpcroffset.gsttime) + pcr_start = demux->cur_pcr; + else + pcr_stop = demux->cur_pcr; + } + + GST_DEBUG ("start %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, + GST_TIME_ARGS (pcr_start.gsttime), pcr_start.offset); + GST_DEBUG ("stop %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, + GST_TIME_ARGS (pcr_stop.gsttime), pcr_stop.offset); + + time_diff = seektime - pcr_start.gsttime; + seekpcroffset = pcr_start; + + GST_DEBUG ("cur %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT + " time diff: %" G_GINT64_FORMAT, + GST_TIME_ARGS (demux->cur_pcr.gsttime), demux->cur_pcr.offset, time_diff); + + /* seek loop */ + while (loop_cnt++ < 10 && (time_diff < 0 || time_diff > 333 * GST_MSECOND)) { + gint64 duration = pcr_stop.gsttime - pcr_start.gsttime; + gint64 size = pcr_stop.offset - pcr_start.offset; + + seekpos = + pcr_start.offset + size * bias * ((double) (seektime - + pcr_start.gsttime) / duration); + + /* look a litle bit behind */ + seekpos = + MAX (pcr_start.offset + 188, seekpos - 55 * MPEGTS_MAX_PACKETSIZE); + + GST_DEBUG ("looking for time: %" GST_TIME_FORMAT " .. %" GST_TIME_FORMAT + " .. %" GST_TIME_FORMAT " bias = %g", + GST_TIME_ARGS (pcr_start.gsttime), + GST_TIME_ARGS (seektime), GST_TIME_ARGS (pcr_stop.gsttime), bias); + GST_DEBUG ("looking in bytes: %" G_GINT64_FORMAT " .. %" G_GINT64_FORMAT + " .. %" G_GINT64_FORMAT, pcr_start.offset, seekpos, pcr_stop.offset, + bias); + + res = + find_pcr_packet (&demux->parent, seekpos, 4000 * MPEGTS_MAX_PACKETSIZE, + &seekpcroffset); + if (G_UNLIKELY (res == GST_FLOW_UNEXPECTED)) { + seekpos = + MAX ((gint64) pcr_start.offset, + seekpos - 2000 * MPEGTS_MAX_PACKETSIZE) + 188; + res = + find_pcr_packet (&demux->parent, seekpos, + 8000 * MPEGTS_MAX_PACKETSIZE, &seekpcroffset); + } + if (G_UNLIKELY (res != GST_FLOW_OK)) { + GST_WARNING ("seeking failed %s", gst_flow_get_name (res)); + goto done; + } + + seekpcroffset.gsttime = calculate_gsttime (&pcr_start, seekpcroffset.pcr); + + bias = + 1.0 + MAX (-.3, MIN (.3, + ((double) seektime - seekpcroffset.gsttime) / duration)); + + /* validate */ + if (G_UNLIKELY ((seekpcroffset.gsttime < pcr_start.gsttime) || + (seekpcroffset.gsttime > pcr_stop.gsttime))) { + GST_ERROR ("Unexpected timestamp found, seeking failed! %" + GST_TIME_FORMAT, GST_TIME_ARGS (seekpcroffset.gsttime)); + res = GST_FLOW_ERROR; + goto done; + } + + if (seekpcroffset.gsttime > seektime) { + pcr_stop = seekpcroffset; + } else { + pcr_start = seekpcroffset; + } + time_diff = seektime - pcr_start.gsttime; + GST_DEBUG ("looking: %" GST_TIME_FORMAT " found: %" GST_TIME_FORMAT + " diff = %" G_GINT64_FORMAT, GST_TIME_ARGS (seektime), + GST_TIME_ARGS (seekpcroffset.gsttime), time_diff); + } + + GST_DEBUG ("seeking finished after %d loops", loop_cnt); + + + segment->last_stop = seekpcroffset.gsttime; + segment->time = seekpcroffset.gsttime; + + /* we stop at the end */ + if (segment->stop == -1) + segment->stop = segment->duration; + + demux->need_newsegment = TRUE; + demux->parent.seek_offset = seekpcroffset.offset; + GST_DEBUG ("seeked to postion:%" GST_TIME_FORMAT, + GST_TIME_ARGS (seekpcroffset.gsttime)); + res = GST_FLOW_OK; + +done: + return res; +} + + +static GstFlowReturn +gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event) +{ + GstTSDemux *demux = (GstTSDemux *) base; + GstFlowReturn res = GST_FLOW_ERROR; + gdouble rate; + gboolean accurate, flush; + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + GstSegment seeksegment; + gboolean update; + + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + if (format != GST_FORMAT_TIME) { + goto done; + } + + GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT + " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), + GST_TIME_ARGS (stop)); + + accurate = flags & GST_SEEK_FLAG_ACCURATE; + flush = flags & GST_SEEK_FLAG_FLUSH; + + if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT | + GST_SEEK_FLAG_SKIP)) { + GST_WARNING ("seek flags 0x%x are not supported", (int) flags); + goto done; + } + + /* copy segment, we need this because we still need the old + * segment when we close the current segment. */ + memcpy (&seeksegment, &demux->segment, sizeof (GstSegment)); + /* configure the segment with the seek variables */ + GST_DEBUG_OBJECT (demux, "configuring seek"); + GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %" + GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT + " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT, + GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop), + GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum), + GST_TIME_ARGS (seeksegment.last_stop), + GST_TIME_ARGS (seeksegment.duration)); + gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start, + stop_type, stop, &update); + GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %" + GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT + " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT, + GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop), + GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum), + GST_TIME_ARGS (seeksegment.last_stop), + GST_TIME_ARGS (seeksegment.duration)); + + res = gst_ts_demux_perform_seek (base, &seeksegment); + if (G_UNLIKELY (res != GST_FLOW_OK)) { + GST_WARNING ("seeking failed %s", gst_flow_get_name (res)); + goto done; + } + + /* commit the new segment */ + memcpy (&demux->segment, &seeksegment, sizeof (GstSegment)); + + if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) { + gst_element_post_message (GST_ELEMENT_CAST (demux), + gst_message_new_segment_start (GST_OBJECT_CAST (demux), + demux->segment.format, demux->segment.last_stop)); + } + +done: + return res; } +static gboolean +gst_ts_demux_srcpad_event (GstPad * pad, GstEvent * event) +{ + gboolean res = TRUE; + GstTSDemux *demux = GST_TS_DEMUX (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (pad, "Got event %s", + gst_event_type_get_name (GST_EVENT_TYPE (event))); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event); + if (!res) { + GST_WARNING ("seeking failed"); + } + gst_event_unref (event); + break; + default: + res = gst_pad_event_default (pad, event); + } + + gst_object_unref (demux); + return res; +} static gboolean push_event (MpegTSBase * base, GstEvent * event) @@ -675,6 +1005,7 @@ create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream, gst_pad_set_caps (pad, caps); gst_pad_set_query_type_function (pad, gst_ts_demux_srcpad_query_types); gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query); + gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event); gst_caps_unref (caps); } @@ -726,6 +1057,33 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream) } static void +gst_ts_demux_stream_flush (TSDemuxStream * stream) +{ + gint i; + + stream->pts = GST_CLOCK_TIME_NONE; + + for (i = 0; i < stream->nbpending; i++) + gst_buffer_unref (stream->pendingbuffers[i]); + memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); + stream->nbpending = 0; + + stream->current = NULL; +} + +static void +gst_ts_demux_flush_streams (GstTSDemux * demux) +{ + gint i; + + for (i = 0; i < 0x2000; i++) { + if (demux->program->streams[i]) { + gst_ts_demux_stream_flush ((TSDemuxStream *) demux->program->streams[i]); + } + } +} + +static void gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program) { GstTSDemux *demux = GST_TS_DEMUX (base); @@ -832,6 +1190,171 @@ process_section (MpegTSBase * base) return done; } +static gboolean +process_pes (MpegTSBase * base, TSPcrOffset * pcroffset) +{ + gboolean based, done = FALSE; + MpegTSPacketizerPacket packet; + MpegTSPacketizerPacketReturn pret; + GstTSDemux *demux = GST_TS_DEMUX (base); + guint16 pcr_pid = 0; + + while ((!done) + && ((pret = + mpegts_packetizer_next_packet (base->packetizer, + &packet)) != PACKET_NEED_MORE)) { + if (G_UNLIKELY (pret == PACKET_BAD)) + /* bad header, skip the packet */ + goto next; + + if (demux->program != NULL) { + pcr_pid = demux->program->pcr_pid; + } + + /* base PSI data */ + if (packet.payload != NULL && mpegts_base_is_psi (base, &packet)) { + MpegTSPacketizerSection section; + + based = + mpegts_packetizer_push_section (base->packetizer, &packet, §ion); + if (G_UNLIKELY (!based)) + /* bad section data */ + goto next; + + if (G_LIKELY (section.complete)) { + /* section complete */ + GST_DEBUG ("Section Complete"); + based = mpegts_base_handle_psi (base, §ion); + gst_buffer_unref (section.buffer); + if (G_UNLIKELY (!based)) + /* bad PSI table */ + goto next; + + } + } + if (packet.pid == pcr_pid && (packet.adaptation_field_control & 0x02) + && (packet.afc_flags & MPEGTS_AFC_PCR_FLAG)) { + GST_DEBUG ("PCR[0x%x]: %" G_GINT64_FORMAT, packet.pid, packet.pcr); + pcroffset->pcr = packet.pcr; + pcroffset->offset = packet.offset; + done = TRUE; + } + next: + mpegts_packetizer_clear_packet (base->packetizer, &packet); + } + return done; +} + +static GstFlowReturn +find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length, + TSPcrOffset * pcroffset) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstTSDemux *demux = GST_TS_DEMUX (base); + MpegTSBaseProgram *program; + GstBuffer *buf; + gboolean done = FALSE; + guint64 scan_offset = 0; + + GST_DEBUG ("Scanning for PCR between:%" G_GINT64_FORMAT + " and the end:%" G_GINT64_FORMAT, offset, offset + length); + + /* Get the program */ + program = demux->program; + if (G_UNLIKELY (program == NULL)) + return GST_FLOW_ERROR; + + mpegts_packetizer_flush (base->packetizer); + + while (!done && scan_offset < length) { + ret = + gst_pad_pull_range (base->sinkpad, offset + scan_offset, + 50 * MPEGTS_MAX_PACKETSIZE, &buf); + if (ret != GST_FLOW_OK) + goto beach; + mpegts_packetizer_push (base->packetizer, buf); + done = process_pes (base, pcroffset); + scan_offset += 50 * MPEGTS_MAX_PACKETSIZE; + } + + if (!done || scan_offset >= length) { + GST_WARNING ("No PCR found!"); + ret = GST_FLOW_ERROR; + goto beach; + } + +beach: + mpegts_packetizer_flush (base->packetizer); + return ret; +} + +static gboolean +verify_timestamps (MpegTSBase * base, TSPcrOffset * first, TSPcrOffset * last) +{ + GstTSDemux *demux = GST_TS_DEMUX (base); + guint64 length = 4000 * MPEGTS_MAX_PACKETSIZE; + guint64 offset = PCR_WRAP_SIZE_128KBPS; + + demux->index = + g_array_sized_new (TRUE, TRUE, sizeof (*first), + 2 + 1 + ((last->offset - first->offset) / PCR_WRAP_SIZE_128KBPS)); + + first->gsttime = PCRTIME_TO_GSTTIME (first->pcr); + demux->index = g_array_append_val (demux->index, *first); + demux->index_size++; + demux->first_pcr = *first; + demux->index_pcr = *first; + GST_DEBUG ("first time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT + " offset: %" G_GINT64_FORMAT + " last pcr: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, + GST_TIME_ARGS (first->gsttime), + GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->pcr)), first->offset, + GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset); + + while (offset + length < last->offset) { + TSPcrOffset half; + GstFlowReturn ret; + gint tries = 0; + + retry: + ret = find_pcr_packet (base, offset, length, &half); + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + GST_WARNING ("no pcr found, retrying"); + if (tries++ < 3) { + offset += length; + length *= 2; + goto retry; + } + return FALSE; + } + + half.gsttime = calculate_gsttime (first, half.pcr); + + GST_DEBUG ("add half time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT + " offset: %" G_GINT64_FORMAT, + GST_TIME_ARGS (half.gsttime), + GST_TIME_ARGS (PCRTIME_TO_GSTTIME (half.pcr)), half.offset); + demux->index = g_array_append_val (demux->index, half); + demux->index_size++; + + length = 4000 * MPEGTS_MAX_PACKETSIZE; + offset += PCR_WRAP_SIZE_128KBPS; + *first = half; + } + + last->gsttime = calculate_gsttime (first, last->pcr); + + GST_DEBUG ("add last time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT + " offset: %" G_GINT64_FORMAT, + GST_TIME_ARGS (last->gsttime), + GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset); + + demux->index = g_array_append_val (demux->index, *last); + demux->index_size++; + + demux->last_pcr = *last; + return TRUE; +} static GstFlowReturn find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset) @@ -844,7 +1367,7 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset) gint64 total_bytes; guint64 scan_offset; guint i = 0; - GstClockTime initial, final; + TSPcrOffset initial, final; GstTSDemux *demux = GST_TS_DEMUX (base); GST_DEBUG ("Scanning for timestamps"); @@ -875,6 +1398,8 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset) mpegts_packetizer_clear (base->packetizer); /* Remove current program so we ensure looking for a PAT when scanning the * for the final PCR */ + gst_structure_free (base->pat); + base->pat = NULL; mpegts_base_remove_program (base, demux->current_program_number); if (ret != GST_FLOW_OK && ret != GST_FLOW_UNEXPECTED) { @@ -922,8 +1447,11 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset) goto beach; } - demux->duration = final - initial; + verify_timestamps (base, &initial, &final); + gst_segment_set_duration (&demux->segment, GST_FORMAT_TIME, + demux->last_pcr.gsttime - demux->first_pcr.gsttime); + demux->duration = demux->last_pcr.gsttime - demux->first_pcr.gsttime; GST_DEBUG ("Done, duration:%" GST_TIME_FORMAT, GST_TIME_ARGS (demux->duration)); @@ -931,13 +1459,15 @@ beach: mpegts_packetizer_clear (base->packetizer); /* Remove current program */ + gst_structure_free (base->pat); + base->pat = NULL; mpegts_base_remove_program (base, demux->current_program_number); return ret; } static GstFlowReturn -process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr, +process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset, guint numpcr, gboolean isinitial) { GstTSDemux *demux = GST_TS_DEMUX (base); @@ -988,15 +1518,31 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr, offset = 0; size = GST_BUFFER_SIZE (buf); - /* FIXME : We should jump to next packet instead of scanning everything */ - while ((size >= br.size) && (nbpcr < numpcr) - && (offset = - gst_byte_reader_masked_scan_uint32 (&br, pcrmask, pcrpattern, - offset, size)) != -1) { + resync: + offset = gst_byte_reader_masked_scan_uint32 (&br, 0xff000000, 0x47000000, + 0, base->packetsize); + + if (offset == -1) + continue; + + while ((nbpcr < numpcr) && (size >= base->packetsize)) { + + guint32 header = GST_READ_UINT32_BE (br.data + offset); + + if ((header >> 24) != 0x47) + goto resync; + + if ((header & pcrmask) != pcrpattern) { + /* Move offset forward by 1 packet */ + size -= base->packetsize; + offset += base->packetsize; + continue; + } + /* Potential PCR */ /* GST_DEBUG ("offset %" G_GUINT64_FORMAT, GST_BUFFER_OFFSET (buf) + offset); GST_MEMDUMP ("something", GST_BUFFER_DATA (buf) + offset, 16);*/ - if ((*(br.data + offset + 5)) & 0x10) { + if ((*(br.data + offset + 5)) & MPEGTS_AFC_PCR_FLAG) { guint64 lpcr = mpegts_packetizer_compute_pcr (br.data + offset + 6); GST_INFO ("Found PCR %" G_GUINT64_FORMAT " %" GST_TIME_FORMAT @@ -1011,6 +1557,9 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr, if (nbpcr > 1) { if (pcrs[nbpcr] == pcrs[nbpcr - 1]) { GST_WARNING ("Found same PCR at different offset"); + } else if (pcrs[nbpcr] < pcrs[nbpcr - 1]) { + GST_WARNING ("Found PCR wraparound"); + nbpcr += 1; } else if ((pcrs[nbpcr] - pcrs[nbpcr - 1]) > (guint64) 10 * 60 * 27000000) { GST_WARNING ("PCR differs with previous PCR by more than 10 mins"); @@ -1019,20 +1568,22 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr, } else nbpcr += 1; } - /* Move offset forward by 1 */ - size -= offset + 1; - offset += 1; - + /* Move offset forward by 1 packet */ + size -= base->packetsize; + offset += base->packetsize; } } beach: GST_DEBUG ("Found %d PCR", nbpcr); if (nbpcr) { - if (isinitial) - *pcr = PCRTIME_TO_GSTTIME (pcrs[0]); - else - *pcr = PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1]); + if (isinitial) { + pcroffset->pcr = pcrs[0]; + pcroffset->offset = pcroffs[0]; + } else { + pcroffset->pcr = pcrs[nbpcr - 1]; + pcroffset->offset = pcroffs[nbpcr - 1]; + } GST_DEBUG ("pcrdiff:%" GST_TIME_FORMAT " offsetdiff %" G_GUINT64_FORMAT, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1] - pcrs[0])), pcroffs[nbpcr - 1] - pcroffs[0]); @@ -1061,6 +1612,18 @@ gst_ts_demux_record_pcr (GstTSDemux * demux, TSDemuxStream * stream, G_GUINT64_FORMAT, bs->pid, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset); + if (G_LIKELY (bs->pid == demux->program->pcr_pid)) { + demux->cur_pcr.gsttime = GST_CLOCK_TIME_NONE; + demux->cur_pcr.offset = offset; + demux->cur_pcr.pcr = pcr; + /* set first_pcr in push mode */ + if (G_UNLIKELY (!demux->first_pcr.gsttime == GST_CLOCK_TIME_NONE)) { + demux->first_pcr.gsttime = PCRTIME_TO_GSTTIME (pcr); + demux->first_pcr.offset = offset; + demux->first_pcr.pcr = pcr; + } + } + if (G_UNLIKELY (demux->emit_statistics)) { GstStructure *st; st = gst_structure_id_empty_new (QUARK_TSDEMUX); @@ -1139,6 +1702,36 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream, } } +static inline GstClockTime +calc_gsttime_from_pts (TSPcrOffset * start, guint64 pts) +{ + GstClockTime time = start->gsttime - PCRTIME_TO_GSTTIME (start->pcr); + + if (start->pcr > pts * 300) + time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE) + MPEGTIME_TO_GSTTIME (pts); + else + time += MPEGTIME_TO_GSTTIME (pts); + + return time; +} + +static gint +TSPcrOffset_find_offset (gconstpointer a, gconstpointer b, gpointer user_data) +{ + +/* GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */ +/* GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */ +/* GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */ +/* GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */ + + if (((TSPcrOffset *) a)->offset < ((TSPcrOffset *) b)->offset) + return -1; + else if (((TSPcrOffset *) a)->offset > ((TSPcrOffset *) b)->offset) + return 1; + else + return 0; +} + static GstFlowReturn gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) { @@ -1214,12 +1807,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) /* PTS 32 */ if ((p2 & 0x80)) { /* PTS */ + GstClockTime time; + guint64 offset = GST_BUFFER_OFFSET (stream->pendingbuffers[0]); + READ_TS (data, pts, discont); - gst_ts_demux_record_pts (demux, stream, pts, - GST_BUFFER_OFFSET (stream->pendingbuffers[0])); + gst_ts_demux_record_pts (demux, stream, pts, offset); length -= 4; - GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = - MPEGTIME_TO_GSTTIME (pts); + + if (demux->index_pcr.offset + PCR_WRAP_SIZE_128KBPS + 1000 * 128 < offset + || (demux->index_pcr.offset > offset)) { + /* find next entry */ + TSPcrOffset *next; + demux->index_pcr.offset = offset; + next = gst_util_array_binary_search (demux->index->data, + demux->index_size, sizeof (*next), TSPcrOffset_find_offset, + GST_SEARCH_MODE_BEFORE, &demux->index_pcr, NULL); + if (next) { + GST_INFO ("new index_pcr %" GST_TIME_FORMAT " offset: %" + G_GINT64_FORMAT, GST_TIME_ARGS (next->gsttime), next->offset); + + demux->index_pcr = *next; + } + } + + time = calc_gsttime_from_pts (&demux->index_pcr, pts); + + GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = time; if (!GST_CLOCK_TIME_IS_VALID (stream->pts)) { stream->pts = GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]); @@ -1344,7 +1957,6 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) guint i; GstClockTime tinypts = GST_CLOCK_TIME_NONE; - GstClockTime stop = GST_CLOCK_TIME_NONE; GstEvent *newsegmentevent; GST_DEBUG ("stream:%p, pid:0x%04x stream_type:%d state:%d pad:%s:%s", @@ -1381,24 +1993,27 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) tinypts)) tinypts = ((TSDemuxStream *) demux->program->streams[i])->pts; } - - } - if (GST_CLOCK_TIME_IS_VALID (demux->duration)) - stop = tinypts + demux->duration; - - GST_DEBUG ("Sending newsegment event"); + GST_DEBUG ("segment: tinypts: %" GST_TIME_FORMAT " stop: %" + GST_TIME_FORMAT " time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (tinypts), + GST_TIME_ARGS (demux->first_pcr.gsttime + demux->duration), + GST_TIME_ARGS (tinypts - demux->first_pcr.gsttime)); newsegmentevent = - gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts, stop, - 0); + gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts, + demux->first_pcr.gsttime + demux->duration, + tinypts - demux->first_pcr.gsttime); push_event ((MpegTSBase *) demux, newsegmentevent); demux->need_newsegment = FALSE; } - GST_DEBUG_OBJECT (stream->pad, "Pushing buffer list "); + GST_DEBUG_OBJECT (stream->pad, + "Pushing buffer list with timestamp: %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (gst_buffer_list_get + (stream->current, 0, 0)))); res = gst_pad_push_list (stream->pad, stream->current); GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res)); diff --git a/gst/mpegtsdemux/tsdemux.h b/gst/mpegtsdemux/tsdemux.h index bfee2df42..636bc3a67 100644 --- a/gst/mpegtsdemux/tsdemux.h +++ b/gst/mpegtsdemux/tsdemux.h @@ -48,6 +48,14 @@ G_BEGIN_DECLS #define GST_TS_DEMUX_CAST(obj) ((GstTSDemux*) obj) typedef struct _GstTSDemux GstTSDemux; typedef struct _GstTSDemuxClass GstTSDemuxClass; +typedef struct _TSPcrOffset TSPcrOffset; + +struct _TSPcrOffset +{ + guint64 gsttime; + guint64 pcr; + guint64 offset; +}; struct _GstTSDemux { @@ -62,7 +70,16 @@ struct _GstTSDemux MpegTSBaseProgram *program; /* Current program */ guint current_program_number; gboolean need_newsegment; + GstSegment segment; GstClockTime duration; /* Total duration */ + + /* pcr wrap and seeking */ + GArray *index; + gint index_size; + TSPcrOffset first_pcr; + TSPcrOffset last_pcr; + TSPcrOffset cur_pcr; + TSPcrOffset index_pcr; }; struct _GstTSDemuxClass |