summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c53
-rw-r--r--tests/check/elements/rtpjitterbuffer.c88
2 files changed, 99 insertions, 42 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 799fdba38..493a8f928 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -1721,35 +1721,64 @@ flushing:
}
-static void
+static GstFlowReturn
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration, expected_dts;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstClockTime total_duration, duration, expected_dts;
TimerType type;
GST_DEBUG_OBJECT (jitterbuffer,
"dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. Also make
- * sure we never go negative. */
+ /* the total duration spanned by the missing packets */
if (dts >= priv->last_in_dts)
- duration = (dts - priv->last_in_dts) / (gap + 1);
+ total_duration = dts - priv->last_in_dts;
else
- /* packet already lost, timer will timeout quickly */
- duration = 0;
+ total_duration = 0;
+
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. */
+ duration = total_duration / (gap + 1);
GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
+ if (total_duration > priv->latency_ns) {
+ GstClockTime gap_time;
+ guint lost_packets;
+
+ gap_time = total_duration - priv->latency_ns;
+
+ if (duration > 0)
+ lost_packets = gap_time / duration;
+ else
+ lost_packets = gap;
+
+ /* too many lost packets, some of the missing packets are already
+ * too late and we can generate lost packet events for them. */
+ GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT ", consider %u lost",
+ GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
+ lost_packets);
+
+ ret =
+ send_lost_event (jitterbuffer, expected, lost_packets,
+ priv->last_in_dts + duration, gap_time, TRUE);
+
+ expected += lost_packets;
+ priv->last_in_dts += gap_time;
+ }
+
expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) {
type = TIMER_TYPE_EXPECTED;
+ /* if we had a timer for the first missing packet, leave it. */
if (find_timer (jitterbuffer, type, expected))
expected++;
} else {
@@ -1761,6 +1790,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
expected_dts += duration;
expected++;
}
+ return ret;
}
static GstFlowReturn
@@ -1884,7 +1914,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
} else {
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */
- calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+ ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+ if (ret != GST_FLOW_OK)
+ goto out_flushing;
+
do_next_seqnum = TRUE;
}
}
diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c
index dfc9a2bdd..181c56326 100644
--- a/tests/check/elements/rtpjitterbuffer.c
+++ b/tests/check/elements/rtpjitterbuffer.c
@@ -25,6 +25,8 @@
#include <gst/check/gstcheck.h>
#include <gst/check/gsttestclock.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
/* For ease of programming we use globals to keep refs for our floating
* src and sink pads we create; otherwise we always have to do get_pad,
* get_peer, and then remove references in every test function */
@@ -323,7 +325,6 @@ GST_START_TEST (test_basetime)
GST_END_TEST;
-#if 0
static const guint payload_size = 160;
static const guint clock_rate = 8000;
static const guint pcmu_payload_type = 0;
@@ -357,25 +358,30 @@ generate_test_buffer (GstClockTime gst_ts,
GstBuffer *buf;
guint8 *payload;
guint i;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
GST_BUFFER_TIMESTAMP (buf) = gst_ts;
- GST_BUFFER_CAPS (buf) = generate_caps ();
- gst_rtp_buffer_set_payload_type (buf, pcmu_payload_type);
- gst_rtp_buffer_set_marker (buf, marker_bit);
- gst_rtp_buffer_set_seq (buf, seq_num);
- gst_rtp_buffer_set_timestamp (buf, rtp_ts);
- gst_rtp_buffer_set_ssrc (buf, test_ssrc);
-
- payload = gst_rtp_buffer_get_payload (buf);
+ //GST_BUFFER_CAPS (buf) = generate_caps ();
+
+ gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
+ gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type);
+ gst_rtp_buffer_set_marker (&rtp, marker_bit);
+ gst_rtp_buffer_set_seq (&rtp, seq_num);
+ gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
+ gst_rtp_buffer_set_ssrc (&rtp, test_ssrc);
+
+ payload = gst_rtp_buffer_get_payload (&rtp);
for (i = 0; i < payload_size; i++)
payload[i] = 0xff;
+ gst_rtp_buffer_unmap (&rtp);
+
return buf;
}
static GstFlowReturn
-test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer)
+test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
TestData *data = gst_pad_get_element_private (pad);
g_async_queue_push (data->buf_queue, buffer);
@@ -383,10 +389,13 @@ test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer)
}
static gboolean
-test_sink_pad_event_cb (GstPad * pad, GstEvent * event)
+test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
{
TestData *data = gst_pad_get_element_private (pad);
const GstStructure *structure = gst_event_get_structure (event);
+
+ GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
+
if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0)
data->lost_event_count++;
@@ -398,6 +407,8 @@ static void
setup_testharness (TestData * data)
{
GstPad *jb_sink_pad, *jb_src_pad;
+ GstSegment seg;
+ GstMiniObject *obj;
// create the testclock
data->clock = gst_test_clock_new ();
@@ -405,7 +416,7 @@ setup_testharness (TestData * data)
gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
// rig up the jitter buffer
- data->jitter_buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL);
+ data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
g_assert (data->jitter_buffer);
gst_element_set_clock (data->jitter_buffer, data->clock);
g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL);
@@ -414,8 +425,7 @@ setup_testharness (TestData * data)
// link in the test source-pad
data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC);
- gst_pad_set_caps (data->test_src_pad, generate_caps ());
- jb_sink_pad = gst_element_get_pad (data->jitter_buffer, "sink");
+ jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink");
g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==,
GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_src_pad, TRUE));
@@ -426,7 +436,7 @@ setup_testharness (TestData * data)
gst_pad_set_caps (data->test_sink_pad, generate_caps ());
gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb);
gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb);
- jb_src_pad = gst_element_get_pad (data->jitter_buffer, "src");
+ jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src");
g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==,
GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_sink_pad, TRUE));
@@ -440,6 +450,16 @@ setup_testharness (TestData * data)
data->lost_event_count = 0;
gst_pad_set_element_private (data->test_sink_pad, data);
+
+ gst_segment_init (&seg, GST_FORMAT_TIME);
+
+ gst_pad_push_event (data->test_src_pad,
+ gst_event_new_stream_start ("stream0"));
+ gst_pad_set_caps (data->test_src_pad, generate_caps ());
+ gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
+
+ while ((obj = g_async_queue_try_pop (data->event_queue)))
+ gst_mini_object_unref (obj);
}
static void
@@ -501,12 +521,13 @@ verify_lost_event (GstEvent * event, guint32 expected_seqnum,
GST_START_TEST (test_only_one_lost_event_on_large_gaps)
{
TestData data;
- GstTestClockPendingID id;
+ GstClockID id, test_id;
guint64 timeout;
GstBuffer *in_buf, *out_buf;
GstEvent *out_event;
gint jb_latency_ms = 200;
guint buffer_size_ms = (payload_size * 1000) / clock_rate;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
setup_testharness (&data);
timeout = 20 * G_USEC_PER_SEC;
@@ -519,14 +540,13 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// wait for the first buffer to be synced to timestamp + latency
- g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK
- (data.clock), &id));
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
// increase the time to timestamp + latency and release the wait
gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
jb_latency_ms * GST_MSECOND);
g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
- == id.clock_id);
+ == id);
// check for the buffer coming out that was pushed in
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
@@ -550,25 +570,27 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// release the wait
- g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK
- (data.clock), &id));
- g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
- == id.clock_id);
+ GST_DEBUG ("wait for id");
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
+ GST_DEBUG ("got wait id %p", id);
+ gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20);
+ test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
+ GST_DEBUG ("process id %p", test_id);
+ g_assert (id == test_id);
// we should now receive a packet-lost-event for buffers 1 through 489
out_event = g_async_queue_timeout_pop (data.event_queue, timeout);
g_assert (out_event != NULL);
g_assert_cmpint (data.lost_event_count, ==, 1);
- verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 489,
+ verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490,
TRUE);
// churn through sync_times until the new buffer gets pushed out
while (g_async_queue_length (data.buf_queue) < 1) {
if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
- if (id.time > gst_clock_get_time (data.clock)) {
- gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), id.time);
- g_print ("setting time to %" GST_TIME_FORMAT "\n",
- GST_TIME_ARGS (id.time));
+ GstClockTime t = gst_clock_id_get_time (id);
+ if (t > gst_clock_get_time (data.clock)) {
+ gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
}
gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
}
@@ -577,19 +599,21 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
g_assert (out_buf != NULL);
g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
- g_assert_cmpint (gst_rtp_buffer_get_seq (out_buf), ==, 500);
+ gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
+ g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500);
+ gst_rtp_buffer_unmap (&rtp);
g_assert_cmpint (GST_BUFFER_TIMESTAMP (out_buf), ==, (10 * GST_SECOND));
// we get as many lost events as the the number of buffers the jitterbuffer
// is able to wait for (+ the one we already got)
- g_assert_cmpint (data.lost_event_count, ==,
- jb_latency_ms / buffer_size_ms + 1);
+ g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms);
destroy_testharness (&data);
}
GST_END_TEST;
+#if 0
GST_START_TEST (test_two_lost_one_arrives_in_time)
{
TestData data;
@@ -840,8 +864,8 @@ rtpjitterbuffer_suite (void)
tcase_add_test (tc_chain, test_push_backward_seq);
tcase_add_test (tc_chain, test_push_unordered);
tcase_add_test (tc_chain, test_basetime);
-#if 0
tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps);
+#if 0
tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time);
tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events);
tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero);