From d98bf4effc6c782ea01b572b049020f61b62e56d Mon Sep 17 00:00:00 2001 From: Sebastian Dröge Date: Tue, 5 Jan 2016 16:15:16 +0200 Subject: WIP: rtpjitterbuffer: Add RFC7273 media clock handling --- gst/rtpmanager/gstrtpjitterbuffer.c | 97 +++++++++++++++- gst/rtpmanager/rtpjitterbuffer.c | 220 +++++++++++++++++++++++++++++++++++- gst/rtpmanager/rtpjitterbuffer.h | 13 ++- 3 files changed, 320 insertions(+), 10 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 1349e3294..a1860ab9c 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -100,8 +100,10 @@ #endif #include +#include #include #include +#include #include "gstrtpjitterbuffer.h" #include "rtpjitterbuffer.h" @@ -413,6 +415,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element, static void gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad); static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element); +static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element, + GstClock * clock); /* pad overrides */ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter); @@ -820,6 +824,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock); + gstelement_class->set_clock = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template)); @@ -1129,6 +1135,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element) return gst_system_clock_obtain (); } +static gboolean +gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock) +{ + GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element); + + rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock); + + return TRUE; +} + static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) { @@ -1233,6 +1249,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, GstStructure *caps_struct; guint val; GstClockTime tval; + const gchar *ts_refclk, *mediaclk; priv = jitterbuffer->priv; @@ -1297,6 +1314,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT, GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop)); + if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) { + GstClock *clock = NULL; + guint64 clock_offset = -1; + + GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s", + ts_refclk); + + if (g_str_has_prefix (ts_refclk, "ntp=")) { + if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) { + GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks"); + } else { + const gchar *host, *portstr; + gchar *hostname; + guint port; + + host = ts_refclk + sizeof ("ntp=") - 1; + if (host[0] == '[') { + /* IPv6 */ + portstr = strchr (host, ']'); + if (portstr && portstr[1] == ':') + portstr = portstr + 1; + else + portstr = NULL; + } else { + portstr = strrchr (host, ':'); + } + + + if (!portstr || sscanf (portstr, ":%u", &port) != 1) + port = 123; + + if (portstr) + hostname = g_strndup (host, (portstr - host)); + else + hostname = g_strdup (host); + + clock = gst_ntp_clock_new (NULL, hostname, port, 0); + g_free (hostname); + } + } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) { + const gchar *domainstr = + ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1; + guint domain; + + if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1) + domain = 0; + + clock = gst_ptp_clock_new (NULL, domain); + } else { + GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock"); + } + + if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) { + GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk); + + if (!g_str_has_prefix (mediaclk, "direct=") + || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1) + GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock"); + if (strstr (mediaclk, "rate=") != NULL) { + GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported"); + clock_offset = -1; + } + } + + rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset); + } else { + rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1); + } + return TRUE; /* ERRORS */ @@ -1566,7 +1652,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) GST_DEBUG_OBJECT (jitterbuffer, "adding event"); item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); if (head) JBUF_SIGNAL_EVENT (priv); @@ -2625,7 +2711,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, RTPJitterBufferItem *item; item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); } g_list_free (events); @@ -2741,7 +2827,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, - &head, &percent))) + &head, &percent, + gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer))))) goto duplicate; /* update timers */ @@ -3311,7 +3398,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, "retry", G_TYPE_UINT, num_rtx_retry, NULL)); item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); /* remove timer now */ remove_timer (jitterbuffer, timer); @@ -3780,7 +3867,7 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent, RTP_JITTER_BUFFER_MODE_BUFFER) { GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query"); item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); if (head) JBUF_SIGNAL_EVENT (priv); JBUF_WAIT_QUERY (priv, out_flushing); diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 80d82663c..9b65303f0 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -85,6 +85,8 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass) static void rtp_jitter_buffer_init (RTPJitterBuffer * jbuf) { + g_mutex_init (&jbuf->clock_lock); + jbuf->packets = g_queue_new (); jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE; @@ -98,8 +100,19 @@ rtp_jitter_buffer_finalize (GObject * object) jbuf = RTP_JITTER_BUFFER_CAST (object); + if (jbuf->media_clock_synced_id) + g_signal_handler_disconnect (jbuf->media_clock, + jbuf->media_clock_synced_id); + if (jbuf->media_clock) + gst_object_unref (jbuf->media_clock); + + if (jbuf->pipeline_clock) + gst_object_unref (jbuf->pipeline_clock); + g_queue_free (jbuf->packets); + g_mutex_clear (&jbuf->clock_lock); + G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object); } @@ -199,6 +212,97 @@ rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf) return jbuf->clock_rate; } +static void +media_clock_synced_cb (GstClock * clock, gboolean synced, + RTPJitterBuffer * jbuf) +{ + GstClockTime internal, external; + + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->pipeline_clock) { + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + g_mutex_unlock (&jbuf->clock_lock); +} + +/** + * rtp_jitter_buffer_set_media_clock: + * @jbuf: an #RTPJitterBuffer + * @clock: (transfer full): media #GstClock + * @clock_offset: RTP time at clock epoch or -1 + * + * Sets the media clock for the media and the clock offset + * + */ +void +rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock, + guint64 clock_offset) +{ + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->media_clock) { + if (jbuf->media_clock_synced_id) + g_signal_handler_disconnect (jbuf->media_clock, + jbuf->media_clock_synced_id); + jbuf->media_clock_synced_id = 0; + gst_object_unref (jbuf->media_clock); + } + jbuf->media_clock = clock; + jbuf->media_clock_offset = clock_offset; + + if (jbuf->pipeline_clock && jbuf->media_clock && + jbuf->pipeline_clock != jbuf->media_clock) { + jbuf->media_clock_synced_id = + g_signal_connect (jbuf->media_clock, "synced", + G_CALLBACK (media_clock_synced_cb), jbuf); + if (gst_clock_is_synced (jbuf->media_clock)) { + GstClockTime internal, external; + + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + + gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock); + } + g_mutex_unlock (&jbuf->clock_lock); +} + +/** + * rtp_jitter_buffer_set_pipeline_clock: + * @jbuf: an #RTPJitterBuffer + * @clock: (transfer full): pipeline #GstClock + * + * Sets the pipeline clock + * + */ +void +rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock) +{ + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->pipeline_clock) + gst_object_unref (jbuf->pipeline_clock); + jbuf->pipeline_clock = clock; + + if (jbuf->pipeline_clock && jbuf->media_clock && + jbuf->pipeline_clock != jbuf->media_clock) { + if (gst_clock_is_synced (jbuf->media_clock)) { + GstClockTime internal, external; + + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + + gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock); + } + g_mutex_unlock (&jbuf->clock_lock); +} + /** * rtp_jitter_buffer_reset_skew: * @jbuf: an #RTPJitterBuffer @@ -211,6 +315,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->base_time = -1; jbuf->base_rtptime = -1; jbuf->base_extrtp = -1; + jbuf->media_clock_base_time = -1; jbuf->ext_rtptime = -1; jbuf->last_rtptime = -1; jbuf->window_pos = 0; @@ -220,6 +325,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->prev_send_diff = -1; jbuf->prev_out_time = -1; jbuf->need_resync = TRUE; + GST_DEBUG ("reset skew correction"); } @@ -642,6 +748,7 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item) * @item: an #RTPJitterBufferItem to insert * @head: TRUE when the head element changed. * @percent: the buffering percent after insertion + * @base_time: base time of the pipeline * * Inserts @item into the packet queue of @jbuf. The sequence number of the * packet will be used to sort the packets. This function takes ownerhip of @@ -655,12 +762,14 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item) */ gboolean rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, - gboolean * head, gint * percent) + gboolean * head, gint * percent, GstClockTime base_time) { GList *list, *event = NULL; guint32 rtptime; guint16 seqnum; GstClockTime dts; + GstClock *media_clock, *pipeline_clock; + guint64 media_clock_offset; g_return_val_if_fail (jbuf != NULL, FALSE); g_return_val_if_fail (item != NULL, FALSE); @@ -759,9 +868,112 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, default: break; } - /* do skew calculation by measuring the difference between rtptime and the - * receive dts, this function will return the skew corrected rtptime. */ - item->pts = calculate_skew (jbuf, rtptime, dts); + + g_mutex_lock (&jbuf->clock_lock); + media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL; + pipeline_clock = + jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL; + media_clock_offset = jbuf->media_clock_offset; + g_mutex_unlock (&jbuf->clock_lock); + + if (media_clock && pipeline_clock && gst_clock_is_synced (media_clock)) { + if (media_clock_offset != -1) { + GstClockTime ntptime, rtptime_tmp; + GstClockTime ntprtptime, rtpsystime; + GstClockTime internal, external; + GstClockTime rate_num, rate_denom; + + gst_clock_get_calibration (media_clock, &internal, &external, &rate_num, + &rate_denom); + + ntptime = gst_clock_get_internal_time (media_clock); + + ntprtptime = + gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND); + ntprtptime += media_clock_offset; + ntprtptime &= 0xffffffff; + + rtptime_tmp = rtptime; + /* Check for wraparounds, we assume that the diff between current RTP + * timestamp and current media clock time can't be bigger than + * 2**31 clock units */ + if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000) + rtptime_tmp += G_GUINT64_CONSTANT (0x100000000); + else if (rtptime_tmp > ntprtptime + && rtptime_tmp - ntprtptime >= 0x80000000) + ntprtptime += G_GUINT64_CONSTANT (0x100000000); + + if (ntprtptime > rtptime_tmp) + ntptime -= + gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate, + GST_SECOND); + else + ntptime += + gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate, + GST_SECOND); + + rtpsystime = + gst_clock_adjust_with_calibration (media_clock, ntptime, internal, + external, rate_num, rate_denom); + /* FIXME: All this assumes that the pipeline has enough additional + * latency to cover for the network delay */ + if (rtpsystime > base_time) + item->pts = rtpsystime - base_time; + else + item->pts = 0; + } else { + GstClockTime internal, external; + GstClockTime rate_num, rate_denom; + GstClockTime nsrtptimediff, rtpntptime, rtpsystime; + GstClockTime ext_rtptime = jbuf->ext_rtptime; + + ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); + + if (jbuf->media_clock_base_time == -1) { + if (dts != -1) + jbuf->media_clock_base_time = + gst_clock_unadjust_with_calibration (media_clock, dts + base_time, + internal, external, rate_num, rate_denom); + else + jbuf->media_clock_base_time = + gst_clock_get_internal_time (media_clock); + jbuf->base_rtptime = ext_rtptime; + } + + if (ext_rtptime > jbuf->base_rtptime) + nsrtptimediff = + gst_util_uint64_scale (ext_rtptime - jbuf->base_rtptime, + jbuf->clock_rate, GST_SECOND); + else + nsrtptimediff = 0; + + rtpntptime = nsrtptimediff + jbuf->media_clock_base_time; + + gst_clock_get_calibration (media_clock, &internal, &external, &rate_num, + &rate_denom); + + rtpsystime = + gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal, + external, rate_num, rate_denom); + + if (rtpsystime > base_time) + item->pts = rtpsystime - base_time; + else + item->pts = 0; + } + } else { + if (media_clock && pipeline_clock) + g_print ("not synced yet\n"); + /* do skew calculation by measuring the difference between rtptime and the + * receive dts, this function will return the skew corrected rtptime. */ + item->pts = calculate_skew (jbuf, rtptime, dts); + } + + if (media_clock) + gst_object_unref (media_clock); + if (pipeline_clock) + gst_object_unref (pipeline_clock); + append: queue_do_insert (jbuf, list, (GList *) item); diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index ba8da6db2..4fe15ac30 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -89,6 +89,7 @@ struct _RTPJitterBuffer { gboolean need_resync; GstClockTime base_time; GstClockTime base_rtptime; + GstClockTime media_clock_base_time; guint32 clock_rate; GstClockTime base_extrtp; GstClockTime prev_out_time; @@ -102,6 +103,12 @@ struct _RTPJitterBuffer { gint64 skew; gint64 prev_send_diff; gboolean buffering_disabled; + + GMutex clock_lock; + GstClock *pipeline_clock; + GstClock *media_clock; + gulong media_clock_synced_id; + guint64 media_clock_offset; }; struct _RTPJitterBufferClass { @@ -150,11 +157,15 @@ void rtp_jitter_buffer_set_delay (RTPJitterBuffer *jbuf, void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, guint32 clock_rate); guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf); +void rtp_jitter_buffer_set_media_clock (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset); +void rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock); + void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf); gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, RTPJitterBufferItem *item, - gboolean *head, gint *percent); + gboolean *head, gint *percent, + GstClockTime base_time); void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled); -- cgit v1.2.3