summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2013-09-18 11:59:28 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2013-09-18 11:59:28 +0200
commitac3bb3acf622e87c2e9a44003b3f9108015ef2cb (patch)
tree57dd30e6f64ec8e906bf9ea74eae08327ee14e6d
parent26402e1c21b22e711c82ba185a7e6682a90154e9 (diff)
rtpjitterbuffer: handle large gaps with one lost event
When we have a large number of missing packets, generate one lost event for all the packets that have no chance of being pushed out in time. Fix and activate unit test for large gaps.
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c53
-rw-r--r--tests/check/elements/rtpjitterbuffer.c88
2 files changed, 99 insertions, 42 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 799fdba38..493a8f928 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -1721,35 +1721,64 @@ flushing:
}
-static void
+static GstFlowReturn
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration, expected_dts;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstClockTime total_duration, duration, expected_dts;
TimerType type;
GST_DEBUG_OBJECT (jitterbuffer,
"dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. Also make
- * sure we never go negative. */
+ /* the total duration spanned by the missing packets */
if (dts >= priv->last_in_dts)
- duration = (dts - priv->last_in_dts) / (gap + 1);
+ total_duration = dts - priv->last_in_dts;
else
- /* packet already lost, timer will timeout quickly */
- duration = 0;
+ total_duration = 0;
+
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. */
+ duration = total_duration / (gap + 1);
GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
+ if (total_duration > priv->latency_ns) {
+ GstClockTime gap_time;
+ guint lost_packets;
+
+ gap_time = total_duration - priv->latency_ns;
+
+ if (duration > 0)
+ lost_packets = gap_time / duration;
+ else
+ lost_packets = gap;
+
+ /* too many lost packets, some of the missing packets are already
+ * too late and we can generate lost packet events for them. */
+ GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT ", consider %u lost",
+ GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
+ lost_packets);
+
+ ret =
+ send_lost_event (jitterbuffer, expected, lost_packets,
+ priv->last_in_dts + duration, gap_time, TRUE);
+
+ expected += lost_packets;
+ priv->last_in_dts += gap_time;
+ }
+
expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) {
type = TIMER_TYPE_EXPECTED;
+ /* if we had a timer for the first missing packet, leave it. */
if (find_timer (jitterbuffer, type, expected))
expected++;
} else {
@@ -1761,6 +1790,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
expected_dts += duration;
expected++;
}
+ return ret;
}
static GstFlowReturn
@@ -1884,7 +1914,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
} else {
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */
- calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+ ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+ if (ret != GST_FLOW_OK)
+ goto out_flushing;
+
do_next_seqnum = TRUE;
}
}
diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c
index dfc9a2bdd..181c56326 100644
--- a/tests/check/elements/rtpjitterbuffer.c
+++ b/tests/check/elements/rtpjitterbuffer.c
@@ -25,6 +25,8 @@
#include <gst/check/gstcheck.h>
#include <gst/check/gsttestclock.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
/* For ease of programming we use globals to keep refs for our floating
* src and sink pads we create; otherwise we always have to do get_pad,
* get_peer, and then remove references in every test function */
@@ -323,7 +325,6 @@ GST_START_TEST (test_basetime)
GST_END_TEST;
-#if 0
static const guint payload_size = 160;
static const guint clock_rate = 8000;
static const guint pcmu_payload_type = 0;
@@ -357,25 +358,30 @@ generate_test_buffer (GstClockTime gst_ts,
GstBuffer *buf;
guint8 *payload;
guint i;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
GST_BUFFER_TIMESTAMP (buf) = gst_ts;
- GST_BUFFER_CAPS (buf) = generate_caps ();
- gst_rtp_buffer_set_payload_type (buf, pcmu_payload_type);
- gst_rtp_buffer_set_marker (buf, marker_bit);
- gst_rtp_buffer_set_seq (buf, seq_num);
- gst_rtp_buffer_set_timestamp (buf, rtp_ts);
- gst_rtp_buffer_set_ssrc (buf, test_ssrc);
-
- payload = gst_rtp_buffer_get_payload (buf);
+ //GST_BUFFER_CAPS (buf) = generate_caps ();
+
+ gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
+ gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type);
+ gst_rtp_buffer_set_marker (&rtp, marker_bit);
+ gst_rtp_buffer_set_seq (&rtp, seq_num);
+ gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
+ gst_rtp_buffer_set_ssrc (&rtp, test_ssrc);
+
+ payload = gst_rtp_buffer_get_payload (&rtp);
for (i = 0; i < payload_size; i++)
payload[i] = 0xff;
+ gst_rtp_buffer_unmap (&rtp);
+
return buf;
}
static GstFlowReturn
-test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer)
+test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
TestData *data = gst_pad_get_element_private (pad);
g_async_queue_push (data->buf_queue, buffer);
@@ -383,10 +389,13 @@ test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer)
}
static gboolean
-test_sink_pad_event_cb (GstPad * pad, GstEvent * event)
+test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
{
TestData *data = gst_pad_get_element_private (pad);
const GstStructure *structure = gst_event_get_structure (event);
+
+ GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
+
if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0)
data->lost_event_count++;
@@ -398,6 +407,8 @@ static void
setup_testharness (TestData * data)
{
GstPad *jb_sink_pad, *jb_src_pad;
+ GstSegment seg;
+ GstMiniObject *obj;
// create the testclock
data->clock = gst_test_clock_new ();
@@ -405,7 +416,7 @@ setup_testharness (TestData * data)
gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
// rig up the jitter buffer
- data->jitter_buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL);
+ data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
g_assert (data->jitter_buffer);
gst_element_set_clock (data->jitter_buffer, data->clock);
g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL);
@@ -414,8 +425,7 @@ setup_testharness (TestData * data)
// link in the test source-pad
data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC);
- gst_pad_set_caps (data->test_src_pad, generate_caps ());
- jb_sink_pad = gst_element_get_pad (data->jitter_buffer, "sink");
+ jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink");
g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==,
GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_src_pad, TRUE));
@@ -426,7 +436,7 @@ setup_testharness (TestData * data)
gst_pad_set_caps (data->test_sink_pad, generate_caps ());
gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb);
gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb);
- jb_src_pad = gst_element_get_pad (data->jitter_buffer, "src");
+ jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src");
g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==,
GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_sink_pad, TRUE));
@@ -440,6 +450,16 @@ setup_testharness (TestData * data)
data->lost_event_count = 0;
gst_pad_set_element_private (data->test_sink_pad, data);
+
+ gst_segment_init (&seg, GST_FORMAT_TIME);
+
+ gst_pad_push_event (data->test_src_pad,
+ gst_event_new_stream_start ("stream0"));
+ gst_pad_set_caps (data->test_src_pad, generate_caps ());
+ gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
+
+ while ((obj = g_async_queue_try_pop (data->event_queue)))
+ gst_mini_object_unref (obj);
}
static void
@@ -501,12 +521,13 @@ verify_lost_event (GstEvent * event, guint32 expected_seqnum,
GST_START_TEST (test_only_one_lost_event_on_large_gaps)
{
TestData data;
- GstTestClockPendingID id;
+ GstClockID id, test_id;
guint64 timeout;
GstBuffer *in_buf, *out_buf;
GstEvent *out_event;
gint jb_latency_ms = 200;
guint buffer_size_ms = (payload_size * 1000) / clock_rate;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
setup_testharness (&data);
timeout = 20 * G_USEC_PER_SEC;
@@ -519,14 +540,13 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// wait for the first buffer to be synced to timestamp + latency
- g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK
- (data.clock), &id));
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
// increase the time to timestamp + latency and release the wait
gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
jb_latency_ms * GST_MSECOND);
g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
- == id.clock_id);
+ == id);
// check for the buffer coming out that was pushed in
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
@@ -550,25 +570,27 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// release the wait
- g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK
- (data.clock), &id));
- g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
- == id.clock_id);
+ GST_DEBUG ("wait for id");
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
+ GST_DEBUG ("got wait id %p", id);
+ gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20);
+ test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
+ GST_DEBUG ("process id %p", test_id);
+ g_assert (id == test_id);
// we should now receive a packet-lost-event for buffers 1 through 489
out_event = g_async_queue_timeout_pop (data.event_queue, timeout);
g_assert (out_event != NULL);
g_assert_cmpint (data.lost_event_count, ==, 1);
- verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 489,
+ verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490,
TRUE);
// churn through sync_times until the new buffer gets pushed out
while (g_async_queue_length (data.buf_queue) < 1) {
if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
- if (id.time > gst_clock_get_time (data.clock)) {
- gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), id.time);
- g_print ("setting time to %" GST_TIME_FORMAT "\n",
- GST_TIME_ARGS (id.time));
+ GstClockTime t = gst_clock_id_get_time (id);
+ if (t > gst_clock_get_time (data.clock)) {
+ gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
}
gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
}
@@ -577,19 +599,21 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
g_assert (out_buf != NULL);
g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
- g_assert_cmpint (gst_rtp_buffer_get_seq (out_buf), ==, 500);
+ gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
+ g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500);
+ gst_rtp_buffer_unmap (&rtp);
g_assert_cmpint (GST_BUFFER_TIMESTAMP (out_buf), ==, (10 * GST_SECOND));
// we get as many lost events as the the number of buffers the jitterbuffer
// is able to wait for (+ the one we already got)
- g_assert_cmpint (data.lost_event_count, ==,
- jb_latency_ms / buffer_size_ms + 1);
+ g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms);
destroy_testharness (&data);
}
GST_END_TEST;
+#if 0
GST_START_TEST (test_two_lost_one_arrives_in_time)
{
TestData data;
@@ -840,8 +864,8 @@ rtpjitterbuffer_suite (void)
tcase_add_test (tc_chain, test_push_backward_seq);
tcase_add_test (tc_chain, test_push_unordered);
tcase_add_test (tc_chain, test_basetime);
-#if 0
tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps);
+#if 0
tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time);
tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events);
tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero);