diff options
author | Göran Jönsson <goranjn@axis.com> | 2019-01-16 12:59:11 +0100 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2019-02-02 10:42:33 +0000 |
commit | afb27f91cfec706d2a4dbf8a6c787504731035a3 (patch) | |
tree | 47c02a49b078b1dd7b1185fa734c62ee1fb5f83f | |
parent | 4be7424de5f6828427231d3f30f2c745d61e0be7 (diff) |
rtsp-server: remove recursive behavior
Introduce a threadpool to send rtp and rtcp to avoid recursive behavior.
-rw-r--r-- | gst/rtsp-server/rtsp-client.c | 1 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.c | 115 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.h | 3 |
3 files changed, 43 insertions, 76 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index e48440e..3a839ec 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -2594,7 +2594,6 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) g_object_ref (trans); add_data_seq (client, ct->interleaved.min); add_data_seq (client, ct->interleaved.max); - gst_rtsp_stream_set_watch_context (stream, priv->watch_context); } /* create and serialize the server transport */ diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 1161257..b8759fc 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -181,7 +181,7 @@ struct _GstRTSPStreamPrivate GHashTable *ptmap; GstRTSPPublishClockMode publish_clock_mode; - GMainContext *watch_context; + GThreadPool *send_pool; }; #define DEFAULT_CONTROL NULL @@ -298,7 +298,7 @@ gst_rtsp_stream_init (GstRTSPStream * stream) NULL, (GDestroyNotify) gst_caps_unref); priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); - priv->watch_context = NULL; + priv->send_pool = NULL; } typedef struct _UdpClientAddrInfo UdpClientAddrInfo; @@ -334,6 +334,8 @@ gst_rtsp_stream_finalize (GObject * obj) /* we really need to be unjoined now */ g_return_if_fail (priv->joined_bin == NULL); + if (priv->send_pool) + g_thread_pool_free (priv->send_pool, TRUE, TRUE); if (priv->mcast_addr_v4) gst_rtsp_address_free (priv->mcast_addr_v4); if (priv->mcast_addr_v6) @@ -378,9 +380,6 @@ gst_rtsp_stream_finalize (GObject * obj) g_hash_table_unref (priv->keys); g_hash_table_destroy (priv->ptmap); - if (priv->watch_context) - g_main_context_unref (priv->watch_context); - G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -2569,6 +2568,34 @@ send_tcp_message (GstRTSPStream * stream, gint idx) g_mutex_lock (&priv->lock); } +static void +send_thread_main (gpointer data, gpointer user_data) +{ + GstRTSPStream *stream = user_data; + GstRTSPStreamPrivate *priv = stream->priv; + gint idx; + gint i; + + g_mutex_lock (&priv->lock); + do { + idx = -1; + /* iterate from 1 and down, so we prioritize RTCP over RTP */ + for (i = 1; i >= 0; i--) { + if (priv->have_buffer[i]) { + /* send message */ + idx = i; + break; + } + } + + if (idx != -1 && priv->n_outstanding == 0) + send_tcp_message (stream, idx); + } while (idx != -1 && priv->n_outstanding == 0); + + GST_DEBUG_OBJECT (stream, "send thread done"); + g_mutex_unlock (&priv->lock); +} + static GstFlowReturn handle_new_sample (GstAppSink * sink, gpointer user_data) { @@ -2579,6 +2606,12 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) g_mutex_lock (&priv->lock); + if (priv->send_pool == NULL) { + GST_DEBUG_OBJECT (stream, "create thread pool"); + priv->send_pool = + g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL); + } + for (i = 0; i < 2; i++) if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) { priv->have_buffer[i] = TRUE; @@ -4360,37 +4393,12 @@ mcast_error: } } -static gboolean -cb_send_tcp_message (GstRTSPStream * stream) -{ - GstRTSPStreamPrivate *priv = stream->priv; - gint idx = -1; - gint i; - - g_mutex_lock (&priv->lock); - - /* iterate from 1 and down, so we prioritize RTCP over RTP */ - for (i = 1; i >= 0; i--) { - if (priv->have_buffer[i]) { - /* send message */ - idx = i; - break; - } - } - - if (idx != -1) - send_tcp_message (stream, idx); - g_mutex_unlock (&priv->lock); - return G_SOURCE_REMOVE; -} - static void on_message_sent (gpointer user_data) { GstRTSPStream *stream = user_data; GstRTSPStreamPrivate *priv = stream->priv; gint idx = -1; - GSource *idle_src; GST_DEBUG_OBJECT (stream, "message send complete"); @@ -4416,24 +4424,12 @@ on_message_sent (gpointer user_data) } if (idx != -1) { - /* When appsink running this callback we want to send as much as we can - * But when idle callback or watch callback is running we will first - * queue an idle probe. This so we prevent a loop to occur were callback - * is sending more data that then call the callback that sends more data - * and so on. If the loop occur then it will starve out handling off - * other events that are handled by watch's context. */ - if (priv->watch_context && g_main_context_is_owner (priv->watch_context)) { - /* underlaying layer is running this callback */ - idle_src = g_idle_source_new (); - g_source_set_callback (idle_src, (GSourceFunc) cb_send_tcp_message, - g_object_ref (stream), g_object_unref); - g_source_attach (idle_src, priv->watch_context); - g_source_unref (idle_src); - } else { - /* appsink is running this callback */ - send_tcp_message (stream, idx); - } + gint dummy; + + GST_DEBUG_OBJECT (stream, "start thread"); + g_thread_pool_push (priv->send_pool, &dummy, NULL); } + g_mutex_unlock (&priv->lock); return; @@ -5802,28 +5798,3 @@ gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream) return res; } - -/** - * gst_rtsp_stream_set_watch_context: - * @stream: a #GstRTSPStream - * @context: a #GMainContext - * - * Sets stream private watch_context. - * - */ -void -gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, - GMainContext * context) -{ - GstRTSPStreamPrivate *priv; - priv = stream->priv; - - g_mutex_lock (&priv->lock); - if (priv->watch_context != NULL) { - g_main_context_unref (priv->watch_context); - priv->watch_context = NULL; - } - if (context) - priv->watch_context = g_main_context_ref (context); - g_mutex_unlock (&priv->lock); -} diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h index 53ad57c..7910bb0 100644 --- a/gst/rtsp-server/rtsp-stream.h +++ b/gst/rtsp-server/rtsp-stream.h @@ -354,9 +354,6 @@ void gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream *stream, GST_RTSP_SERVER_API guint gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream *stream); -GST_RTSP_SERVER_API -void gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, GMainContext * context); - /** * GstRTSPStreamTransportFilterFunc: * @stream: a #GstRTSPStream object |