diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2017-10-12 21:00:16 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2018-01-22 12:38:24 +0200 |
commit | 0a8a00396628dd43714ffe495192d748d8f7f402 (patch) | |
tree | fc186cd160325fd3086d76a7e977f1ef71874749 | |
parent | 4ec17b1975d38c3a87d593751197b9503ae14fba (diff) |
rtsp-media: Add support for sending+receiving medias
We need to add an appsrc/appsink in that case because otherwise the
media bin will be a sink and a source for rtpbin, causing a pipeline
loop.
-rw-r--r-- | gst/rtsp-server/rtsp-media.c | 168 |
1 files changed, 162 insertions, 6 deletions
diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index cebd75f..4b1e76a 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -119,6 +119,7 @@ struct _GstRTSPMediaPrivate GSource *source; guint id; GstRTSPThread *thread; + GList *pending_pipeline_elements; gboolean time_provider; GstNetTimeProvider *nettime; @@ -462,6 +463,7 @@ gst_rtsp_media_finalize (GObject * obj) g_ptr_array_unref (priv->streams); g_list_free_full (priv->dynamic, gst_object_unref); + g_list_free_full (priv->pending_pipeline_elements, gst_object_unref); if (priv->pipeline) gst_object_unref (priv->pipeline); @@ -867,6 +869,7 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline) GstRTSPMediaPrivate *priv; GstElement *old; GstNetTimeProvider *nettime; + GList *l; g_return_if_fail (GST_IS_RTSP_MEDIA (media)); g_return_if_fail (GST_IS_PIPELINE (pipeline)); @@ -887,6 +890,12 @@ gst_rtsp_media_take_pipeline (GstRTSPMedia * media, GstPipeline * pipeline) gst_object_unref (nettime); gst_bin_add (GST_BIN_CAST (pipeline), priv->element); + + for (l = priv->pending_pipeline_elements; l; l = l->next) { + gst_bin_add (GST_BIN_CAST (pipeline), l->data); + } + g_list_free (priv->pending_pipeline_elements); + priv->pending_pipeline_elements = NULL; } /** @@ -1852,6 +1861,78 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media) } } +typedef struct +{ + GstElement *appsink, *appsrc; + GstRTSPStream *stream; +} AppSinkSrcData; + +static GstFlowReturn +appsink_new_sample (GstAppSink * appsink, gpointer user_data) +{ + AppSinkSrcData *data = user_data; + GstSample *sample; + GstFlowReturn ret; + + sample = gst_app_sink_pull_sample (appsink); + if (!sample) + return GST_FLOW_FLUSHING; + + + ret = gst_app_src_push_sample (GST_APP_SRC (data->appsrc), sample); + gst_sample_unref (sample); + return ret; +} + +static GstAppSinkCallbacks appsink_callbacks = { + NULL, + NULL, + appsink_new_sample, +}; + +static GstPadProbeReturn +appsink_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + AppSinkSrcData *data = user_data; + + if (GST_IS_EVENT (info->data) + && GST_EVENT_TYPE (info->data) == GST_EVENT_LATENCY) { + GstClockTime min, max; + + if (gst_base_sink_query_latency (GST_BASE_SINK (data->appsink), NULL, NULL, + &min, &max)) { + g_object_set (data->appsrc, "min-latency", min, "max-latency", max, NULL); + g_print ("setting latency to %lu %lu\n", min, max); + } + } else if (GST_IS_QUERY (info->data)) { + GstPad *srcpad = gst_element_get_static_pad (data->appsrc, "src"); + if (gst_pad_peer_query (srcpad, GST_QUERY_CAST (info->data))) { + gst_object_unref (srcpad); + return GST_PAD_PROBE_HANDLED; + } + gst_object_unref (srcpad); + } + + return GST_PAD_PROBE_OK; +} + +static GstPadProbeReturn +appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + AppSinkSrcData *data = user_data; + + if (GST_IS_QUERY (info->data)) { + GstPad *sinkpad = gst_element_get_static_pad (data->appsink, "sink"); + if (gst_pad_peer_query (sinkpad, GST_QUERY_CAST (info->data))) { + gst_object_unref (sinkpad); + return GST_PAD_PROBE_HANDLED; + } + gst_object_unref (sinkpad); + } + + return GST_PAD_PROBE_OK; +} + /** * gst_rtsp_media_create_stream: * @media: a #GstRTSPMedia @@ -1870,9 +1951,10 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader, { GstRTSPMediaPrivate *priv; GstRTSPStream *stream; - GstPad *ghostpad; + GstPad *streampad; gchar *name; gint idx; + AppSinkSrcData *data = NULL; g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL); g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL); @@ -1890,12 +1972,71 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader, else name = g_strdup_printf ("sink_%u", idx); - ghostpad = gst_ghost_pad_new (name, pad); - gst_pad_set_active (ghostpad, TRUE); - gst_element_add_pad (priv->element, ghostpad); + if ((GST_PAD_IS_SRC (pad) && priv->element->numsinkpads > 0) || + (GST_PAD_IS_SINK (pad) && priv->element->numsrcpads > 0)) { + GstElement *appsink, *appsrc; + GstPad *sinkpad, *srcpad; + + appsink = gst_element_factory_make ("appsink", NULL); + appsrc = gst_element_factory_make ("appsrc", NULL); + + if (GST_PAD_IS_SINK (pad)) { + srcpad = gst_element_get_static_pad (appsrc, "src"); + + gst_bin_add (GST_BIN (priv->element), appsrc); + + gst_pad_link (srcpad, pad); + gst_object_unref (srcpad); + + streampad = gst_element_get_static_pad (appsink, "sink"); + + priv->pending_pipeline_elements = + g_list_prepend (priv->pending_pipeline_elements, appsink); + } else { + sinkpad = gst_element_get_static_pad (appsink, "sink"); + + gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + streampad = gst_element_get_static_pad (appsrc, "src"); + + priv->pending_pipeline_elements = + g_list_prepend (priv->pending_pipeline_elements, appsrc); + } + + g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live", + TRUE, NULL); + g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL); + + data = g_new0 (AppSinkSrcData, 1); + data->appsink = appsink; + data->appsrc = appsrc; + + sinkpad = gst_element_get_static_pad (appsink, "sink"); + gst_pad_add_probe (sinkpad, + GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, + appsink_pad_probe, data, NULL); + gst_object_unref (sinkpad); + + srcpad = gst_element_get_static_pad (appsrc, "src"); + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM, + appsrc_pad_probe, data, NULL); + gst_object_unref (srcpad); + + gst_app_sink_set_callbacks (GST_APP_SINK (appsink), &appsink_callbacks, + data, NULL); + g_object_set_data_full (G_OBJECT (streampad), "media-appsink-appsrc", data, + g_free); + } else { + streampad = gst_ghost_pad_new (name, pad); + gst_pad_set_active (streampad, TRUE); + gst_element_add_pad (priv->element, streampad); + } g_free (name); - stream = gst_rtsp_stream_new (idx, payloader, ghostpad); + stream = gst_rtsp_stream_new (idx, payloader, streampad); + if (data) + data->stream = stream; if (priv->pool) gst_rtsp_stream_set_address_pool (stream, priv->pool); gst_rtsp_stream_set_multicast_iface (stream, priv->multicast_iface); @@ -1943,13 +2084,28 @@ gst_rtsp_media_remove_stream (GstRTSPMedia * media, GstRTSPStream * stream) { GstRTSPMediaPrivate *priv; GstPad *srcpad; + AppSinkSrcData *data; priv = media->priv; g_mutex_lock (&priv->lock); /* remove the ghostpad */ srcpad = gst_rtsp_stream_get_srcpad (stream); - gst_element_remove_pad (priv->element, srcpad); + data = g_object_get_data (G_OBJECT (srcpad), "media-appsink-appsrc"); + if (data) { + if (GST_OBJECT_PARENT (data->appsrc) == GST_OBJECT_CAST (priv->pipeline)) + gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsrc); + else if (GST_OBJECT_PARENT (data->appsrc) == + GST_OBJECT_CAST (priv->element)) + gst_bin_remove (GST_BIN_CAST (priv->element), data->appsrc); + if (GST_OBJECT_PARENT (data->appsink) == GST_OBJECT_CAST (priv->pipeline)) + gst_bin_remove (GST_BIN_CAST (priv->pipeline), data->appsink); + else if (GST_OBJECT_PARENT (data->appsink) == + GST_OBJECT_CAST (priv->element)) + gst_bin_remove (GST_BIN_CAST (priv->element), data->appsink); + } else { + gst_element_remove_pad (priv->element, srcpad); + } gst_object_unref (srcpad); /* now remove the stream */ g_object_ref (stream); |