summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2016-01-05 16:15:16 +0200
committerSebastian Dröge <sebastian@centricular.com>2016-01-06 21:44:56 +0200
commitd98bf4effc6c782ea01b572b049020f61b62e56d (patch)
treee04f726764b834ffab2193a4284818612bafabf8
parent49175153420b7f0696981d2f94e8e66196e325a5 (diff)
WIP: rtpjitterbuffer: Add RFC7273 media clock handlingrfc7273
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c97
-rw-r--r--gst/rtpmanager/rtpjitterbuffer.c220
-rw-r--r--gst/rtpmanager/rtpjitterbuffer.h13
3 files changed, 320 insertions, 10 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 1349e3294..a1860ab9c 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -100,8 +100,10 @@
#endif
#include <stdlib.h>
+#include <stdio.h>
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/net/net.h>
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
@@ -413,6 +415,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
GstPad * pad);
static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
+static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
+ GstClock * clock);
/* pad overrides */
static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
@@ -820,6 +824,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
+ gstelement_class->set_clock =
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
@@ -1129,6 +1135,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element)
return gst_system_clock_obtain ();
}
+static gboolean
+gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
+{
+ GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+
+ rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
+
+ return TRUE;
+}
+
static void
gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
{
@@ -1233,6 +1249,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
GstStructure *caps_struct;
guint val;
GstClockTime tval;
+ const gchar *ts_refclk, *mediaclk;
priv = jitterbuffer->priv;
@@ -1297,6 +1314,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
"npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
+ if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
+ GstClock *clock = NULL;
+ guint64 clock_offset = -1;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
+ ts_refclk);
+
+ if (g_str_has_prefix (ts_refclk, "ntp=")) {
+ if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
+ GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
+ } else {
+ const gchar *host, *portstr;
+ gchar *hostname;
+ guint port;
+
+ host = ts_refclk + sizeof ("ntp=") - 1;
+ if (host[0] == '[') {
+ /* IPv6 */
+ portstr = strchr (host, ']');
+ if (portstr && portstr[1] == ':')
+ portstr = portstr + 1;
+ else
+ portstr = NULL;
+ } else {
+ portstr = strrchr (host, ':');
+ }
+
+
+ if (!portstr || sscanf (portstr, ":%u", &port) != 1)
+ port = 123;
+
+ if (portstr)
+ hostname = g_strndup (host, (portstr - host));
+ else
+ hostname = g_strdup (host);
+
+ clock = gst_ntp_clock_new (NULL, hostname, port, 0);
+ g_free (hostname);
+ }
+ } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
+ const gchar *domainstr =
+ ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
+ guint domain;
+
+ if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
+ domain = 0;
+
+ clock = gst_ptp_clock_new (NULL, domain);
+ } else {
+ GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
+ }
+
+ if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
+
+ if (!g_str_has_prefix (mediaclk, "direct=")
+ || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
+ GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
+ if (strstr (mediaclk, "rate=") != NULL) {
+ GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
+ clock_offset = -1;
+ }
+ }
+
+ rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
+ } else {
+ rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
+ }
+
return TRUE;
/* ERRORS */
@@ -1566,7 +1652,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
if (head)
JBUF_SIGNAL_EVENT (priv);
@@ -2625,7 +2711,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
RTPJitterBufferItem *item;
item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
}
g_list_free (events);
@@ -2741,7 +2827,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
- &head, &percent)))
+ &head, &percent,
+ gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)))))
goto duplicate;
/* update timers */
@@ -3311,7 +3398,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
"retry", G_TYPE_UINT, num_rtx_retry, NULL));
item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
/* remove timer now */
remove_timer (jitterbuffer, timer);
@@ -3780,7 +3867,7 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
RTP_JITTER_BUFFER_MODE_BUFFER) {
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
if (head)
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUERY (priv, out_flushing);
diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c
index 80d82663c..9b65303f0 100644
--- a/gst/rtpmanager/rtpjitterbuffer.c
+++ b/gst/rtpmanager/rtpjitterbuffer.c
@@ -85,6 +85,8 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
static void
rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
{
+ g_mutex_init (&jbuf->clock_lock);
+
jbuf->packets = g_queue_new ();
jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE;
@@ -98,8 +100,19 @@ rtp_jitter_buffer_finalize (GObject * object)
jbuf = RTP_JITTER_BUFFER_CAST (object);
+ if (jbuf->media_clock_synced_id)
+ g_signal_handler_disconnect (jbuf->media_clock,
+ jbuf->media_clock_synced_id);
+ if (jbuf->media_clock)
+ gst_object_unref (jbuf->media_clock);
+
+ if (jbuf->pipeline_clock)
+ gst_object_unref (jbuf->pipeline_clock);
+
g_queue_free (jbuf->packets);
+ g_mutex_clear (&jbuf->clock_lock);
+
G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
}
@@ -199,6 +212,97 @@ rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf)
return jbuf->clock_rate;
}
+static void
+media_clock_synced_cb (GstClock * clock, gboolean synced,
+ RTPJitterBuffer * jbuf)
+{
+ GstClockTime internal, external;
+
+ g_mutex_lock (&jbuf->clock_lock);
+ if (jbuf->pipeline_clock) {
+ internal = gst_clock_get_internal_time (jbuf->media_clock);
+ external = gst_clock_get_time (jbuf->pipeline_clock);
+
+ gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+ }
+ g_mutex_unlock (&jbuf->clock_lock);
+}
+
+/**
+ * rtp_jitter_buffer_set_media_clock:
+ * @jbuf: an #RTPJitterBuffer
+ * @clock: (transfer full): media #GstClock
+ * @clock_offset: RTP time at clock epoch or -1
+ *
+ * Sets the media clock for the media and the clock offset
+ *
+ */
+void
+rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock,
+ guint64 clock_offset)
+{
+ g_mutex_lock (&jbuf->clock_lock);
+ if (jbuf->media_clock) {
+ if (jbuf->media_clock_synced_id)
+ g_signal_handler_disconnect (jbuf->media_clock,
+ jbuf->media_clock_synced_id);
+ jbuf->media_clock_synced_id = 0;
+ gst_object_unref (jbuf->media_clock);
+ }
+ jbuf->media_clock = clock;
+ jbuf->media_clock_offset = clock_offset;
+
+ if (jbuf->pipeline_clock && jbuf->media_clock &&
+ jbuf->pipeline_clock != jbuf->media_clock) {
+ jbuf->media_clock_synced_id =
+ g_signal_connect (jbuf->media_clock, "synced",
+ G_CALLBACK (media_clock_synced_cb), jbuf);
+ if (gst_clock_is_synced (jbuf->media_clock)) {
+ GstClockTime internal, external;
+
+ internal = gst_clock_get_internal_time (jbuf->media_clock);
+ external = gst_clock_get_time (jbuf->pipeline_clock);
+
+ gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+ }
+
+ gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
+ }
+ g_mutex_unlock (&jbuf->clock_lock);
+}
+
+/**
+ * rtp_jitter_buffer_set_pipeline_clock:
+ * @jbuf: an #RTPJitterBuffer
+ * @clock: (transfer full): pipeline #GstClock
+ *
+ * Sets the pipeline clock
+ *
+ */
+void
+rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock)
+{
+ g_mutex_lock (&jbuf->clock_lock);
+ if (jbuf->pipeline_clock)
+ gst_object_unref (jbuf->pipeline_clock);
+ jbuf->pipeline_clock = clock;
+
+ if (jbuf->pipeline_clock && jbuf->media_clock &&
+ jbuf->pipeline_clock != jbuf->media_clock) {
+ if (gst_clock_is_synced (jbuf->media_clock)) {
+ GstClockTime internal, external;
+
+ internal = gst_clock_get_internal_time (jbuf->media_clock);
+ external = gst_clock_get_time (jbuf->pipeline_clock);
+
+ gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+ }
+
+ gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
+ }
+ g_mutex_unlock (&jbuf->clock_lock);
+}
+
/**
* rtp_jitter_buffer_reset_skew:
* @jbuf: an #RTPJitterBuffer
@@ -211,6 +315,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
jbuf->base_time = -1;
jbuf->base_rtptime = -1;
jbuf->base_extrtp = -1;
+ jbuf->media_clock_base_time = -1;
jbuf->ext_rtptime = -1;
jbuf->last_rtptime = -1;
jbuf->window_pos = 0;
@@ -220,6 +325,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
jbuf->prev_send_diff = -1;
jbuf->prev_out_time = -1;
jbuf->need_resync = TRUE;
+
GST_DEBUG ("reset skew correction");
}
@@ -642,6 +748,7 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
* @item: an #RTPJitterBufferItem to insert
* @head: TRUE when the head element changed.
* @percent: the buffering percent after insertion
+ * @base_time: base time of the pipeline
*
* Inserts @item into the packet queue of @jbuf. The sequence number of the
* packet will be used to sort the packets. This function takes ownerhip of
@@ -655,12 +762,14 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
*/
gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
- gboolean * head, gint * percent)
+ gboolean * head, gint * percent, GstClockTime base_time)
{
GList *list, *event = NULL;
guint32 rtptime;
guint16 seqnum;
GstClockTime dts;
+ GstClock *media_clock, *pipeline_clock;
+ guint64 media_clock_offset;
g_return_val_if_fail (jbuf != NULL, FALSE);
g_return_val_if_fail (item != NULL, FALSE);
@@ -759,9 +868,112 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
default:
break;
}
- /* do skew calculation by measuring the difference between rtptime and the
- * receive dts, this function will return the skew corrected rtptime. */
- item->pts = calculate_skew (jbuf, rtptime, dts);
+
+ g_mutex_lock (&jbuf->clock_lock);
+ media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL;
+ pipeline_clock =
+ jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL;
+ media_clock_offset = jbuf->media_clock_offset;
+ g_mutex_unlock (&jbuf->clock_lock);
+
+ if (media_clock && pipeline_clock && gst_clock_is_synced (media_clock)) {
+ if (media_clock_offset != -1) {
+ GstClockTime ntptime, rtptime_tmp;
+ GstClockTime ntprtptime, rtpsystime;
+ GstClockTime internal, external;
+ GstClockTime rate_num, rate_denom;
+
+ gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
+ &rate_denom);
+
+ ntptime = gst_clock_get_internal_time (media_clock);
+
+ ntprtptime =
+ gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND);
+ ntprtptime += media_clock_offset;
+ ntprtptime &= 0xffffffff;
+
+ rtptime_tmp = rtptime;
+ /* Check for wraparounds, we assume that the diff between current RTP
+ * timestamp and current media clock time can't be bigger than
+ * 2**31 clock units */
+ if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000)
+ rtptime_tmp += G_GUINT64_CONSTANT (0x100000000);
+ else if (rtptime_tmp > ntprtptime
+ && rtptime_tmp - ntprtptime >= 0x80000000)
+ ntprtptime += G_GUINT64_CONSTANT (0x100000000);
+
+ if (ntprtptime > rtptime_tmp)
+ ntptime -=
+ gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate,
+ GST_SECOND);
+ else
+ ntptime +=
+ gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate,
+ GST_SECOND);
+
+ rtpsystime =
+ gst_clock_adjust_with_calibration (media_clock, ntptime, internal,
+ external, rate_num, rate_denom);
+ /* FIXME: All this assumes that the pipeline has enough additional
+ * latency to cover for the network delay */
+ if (rtpsystime > base_time)
+ item->pts = rtpsystime - base_time;
+ else
+ item->pts = 0;
+ } else {
+ GstClockTime internal, external;
+ GstClockTime rate_num, rate_denom;
+ GstClockTime nsrtptimediff, rtpntptime, rtpsystime;
+ GstClockTime ext_rtptime = jbuf->ext_rtptime;
+
+ ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
+
+ if (jbuf->media_clock_base_time == -1) {
+ if (dts != -1)
+ jbuf->media_clock_base_time =
+ gst_clock_unadjust_with_calibration (media_clock, dts + base_time,
+ internal, external, rate_num, rate_denom);
+ else
+ jbuf->media_clock_base_time =
+ gst_clock_get_internal_time (media_clock);
+ jbuf->base_rtptime = ext_rtptime;
+ }
+
+ if (ext_rtptime > jbuf->base_rtptime)
+ nsrtptimediff =
+ gst_util_uint64_scale (ext_rtptime - jbuf->base_rtptime,
+ jbuf->clock_rate, GST_SECOND);
+ else
+ nsrtptimediff = 0;
+
+ rtpntptime = nsrtptimediff + jbuf->media_clock_base_time;
+
+ gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
+ &rate_denom);
+
+ rtpsystime =
+ gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal,
+ external, rate_num, rate_denom);
+
+ if (rtpsystime > base_time)
+ item->pts = rtpsystime - base_time;
+ else
+ item->pts = 0;
+ }
+ } else {
+ if (media_clock && pipeline_clock)
+ g_print ("not synced yet\n");
+ /* do skew calculation by measuring the difference between rtptime and the
+ * receive dts, this function will return the skew corrected rtptime. */
+ item->pts = calculate_skew (jbuf, rtptime, dts);
+ }
+
+ if (media_clock)
+ gst_object_unref (media_clock);
+ if (pipeline_clock)
+ gst_object_unref (pipeline_clock);
+
append:
queue_do_insert (jbuf, list, (GList *) item);
diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h
index ba8da6db2..4fe15ac30 100644
--- a/gst/rtpmanager/rtpjitterbuffer.h
+++ b/gst/rtpmanager/rtpjitterbuffer.h
@@ -89,6 +89,7 @@ struct _RTPJitterBuffer {
gboolean need_resync;
GstClockTime base_time;
GstClockTime base_rtptime;
+ GstClockTime media_clock_base_time;
guint32 clock_rate;
GstClockTime base_extrtp;
GstClockTime prev_out_time;
@@ -102,6 +103,12 @@ struct _RTPJitterBuffer {
gint64 skew;
gint64 prev_send_diff;
gboolean buffering_disabled;
+
+ GMutex clock_lock;
+ GstClock *pipeline_clock;
+ GstClock *media_clock;
+ gulong media_clock_synced_id;
+ guint64 media_clock_offset;
};
struct _RTPJitterBufferClass {
@@ -150,11 +157,15 @@ void rtp_jitter_buffer_set_delay (RTPJitterBuffer *jbuf,
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, guint32 clock_rate);
guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
+void rtp_jitter_buffer_set_media_clock (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset);
+void rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock);
+
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
RTPJitterBufferItem *item,
- gboolean *head, gint *percent);
+ gboolean *head, gint *percent,
+ GstClockTime base_time);
void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled);