summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2016-09-05 18:04:50 +0300
committerSebastian Dröge <sebastian@centricular.com>2016-10-25 15:53:49 +0300
commit8084e1a9845014d1e00d4f9aacb371d52a5dc98b (patch)
treef9302ffdb3acf136349924933149bfa877020e35
parent0befc7ea9acbed2701a5adc65e5bb818b06b5830 (diff)
rtsp-stream: Always create multicast UDP elements if the protocol flag is set
Adding them later will cause deadlocks due to 1) pre-rolling and staying in PAUSED with the unicast/TCP sinks 2) adding the multicast sink 3) waiting for it to get data to preroll again 3) never happens because the queues after the tee are full.
-rw-r--r--gst/rtsp-server/rtsp-stream.c215
1 files changed, 79 insertions, 136 deletions
diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c
index 132d8bd..2a88c29 100644
--- a/gst/rtsp-server/rtsp-stream.c
+++ b/gst/rtsp-server/rtsp-stream.c
@@ -124,8 +124,6 @@ struct _GstRTSPStreamPrivate
GstRTSPAddressPool *pool;
/* unicast server addr/port */
- GstRTSPRange server_port_v4;
- GstRTSPRange server_port_v6;
GstRTSPAddress *server_addr_v4;
GstRTSPAddress *server_addr_v6;
@@ -1136,7 +1134,7 @@ error:
static gboolean
alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
- GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out)
+ GstRTSPAddress ** server_addr_out, gboolean multicast)
{
GstRTSPStreamPrivate *priv = stream->priv;
GSocket *rtp_socket = NULL;
@@ -1182,13 +1180,21 @@ again:
g_socket_set_multicast_loopback (rtp_socket, FALSE);
}
- if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
+ if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
GstRTSPAddressFlags flags;
if (addr)
rejected_addresses = g_list_prepend (rejected_addresses, addr);
- flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
+ if (!pool)
+ goto no_ports;
+
+ flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
+ if (multicast)
+ flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
+ else
+ flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
+
if (family == G_SOCKET_FAMILY_IPV6)
flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
else
@@ -1250,21 +1256,20 @@ again:
}
g_object_unref (rtcp_sockaddr);
- if (addr == NULL)
- addr_str = g_inet_address_to_string (inetaddr);
- else
- addr_str = addr->address;
+ if (!addr) {
+ addr = g_slice_new0 (GstRTSPAddress);
+ addr->address = g_inet_address_to_string (inetaddr);
+ addr->port = tmp_rtp;
+ addr->n_ports = 2;
+ }
+
+ addr_str = addr->address;
g_clear_object (&inetaddr);
if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
- if (addr == NULL)
- g_free (addr_str);
goto no_udp_protocol;
}
- if (addr == NULL)
- g_free (addr_str);
-
g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
@@ -1272,18 +1277,21 @@ again:
if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
goto port_error;
- server_port_out->min = rtpport;
- server_port_out->max = rtcpport;
-
/* This function is called twice (for v4 and v6) but we create only one pair
* of udpsinks. */
if (!udpsink_out[0]
&& !create_and_configure_udpsinks (stream, udpsink_out))
goto no_udp_protocol;
+ if (multicast) {
+ g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
+ g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
+ }
+
set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
*server_addr_out = addr;
+
g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
g_object_unref (rtp_socket);
@@ -1398,15 +1406,21 @@ alloc_ports (GstRTSPStream * stream)
GstRTSPStreamPrivate *priv = stream->priv;
gboolean ret = TRUE;
- if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
- (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
+ if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
- priv->udpsrc_v4, priv->udpsink,
- &priv->server_port_v4, &priv->server_addr_v4);
+ priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
+
+ ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
+ priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
+ }
+
+ /* FIXME: Maybe actually consider the return values? */
+ if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
+ ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
+ priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
- priv->udpsrc_v6, priv->udpsink,
- &priv->server_port_v6, &priv->server_addr_v6);
+ priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
}
return ret;
@@ -1433,11 +1447,17 @@ gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
g_mutex_lock (&priv->lock);
if (family == G_SOCKET_FAMILY_IPV4) {
- if (server_port)
- *server_port = priv->server_port_v4;
+ if (server_port) {
+ server_port->min = priv->server_addr_v4->port;
+ server_port->max =
+ priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
+ }
} else {
- if (server_port)
- *server_port = priv->server_port_v6;
+ if (server_port) {
+ server_port->min = priv->server_addr_v6->port;
+ server_port->max =
+ priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
+ }
}
g_mutex_unlock (&priv->lock);
}
@@ -2220,7 +2240,7 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
{
GstRTSPStreamPrivate *priv;
GstPad *pad;
- gboolean is_tcp = FALSE, is_udp = FALSE;
+ gboolean is_tcp, is_udp;
gint i;
priv = stream->priv;
@@ -2277,7 +2297,12 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
gst_pad_link (priv->send_src[i], pad);
gst_object_unref (pad);
- plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
+ if (priv->udpsink[i])
+ plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
+
+ if (priv->mcast_udpsink[i])
+ plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
+ &priv->mcast_udpqueue[i]);
if (is_tcp) {
g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
@@ -2300,12 +2325,16 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
if (state != GST_STATE_NULL) {
if (priv->udpsink[i])
gst_element_set_state (priv->udpsink[i], state);
+ if (priv->mcast_udpsink[i])
+ gst_element_set_state (priv->mcast_udpsink[i], state);
if (priv->appsink[i])
gst_element_set_state (priv->appsink[i], state);
if (priv->appqueue[i])
gst_element_set_state (priv->appqueue[i], state);
if (priv->udpqueue[i])
gst_element_set_state (priv->udpqueue[i], state);
+ if (priv->mcast_udpqueue[i])
+ gst_element_set_state (priv->mcast_udpqueue[i], state);
if (priv->tee[i])
gst_element_set_state (priv->tee[i], state);
}
@@ -2388,6 +2417,12 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
if (priv->udpsrc_v6[i])
plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
+ if (priv->mcast_udpsrc_v4[i])
+ plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
+
+ if (priv->mcast_udpsrc_v6[i])
+ plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
+
if (is_tcp) {
/* make and add appsrc */
priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
@@ -2405,19 +2440,13 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
}
static gboolean
-create_mcast_part_for_transport (GstRTSPStream * stream,
+check_mcast_part_for_transport (GstRTSPStream * stream,
const GstRTSPTransport * tr)
{
GstRTSPStreamPrivate *priv = stream->priv;
GInetAddress *inetaddr;
GSocketFamily family;
GstRTSPAddress *mcast_addr;
- GstElement **mcast_udpsrc;
- GSocket *rtp_socket = NULL;
- GSocket *rtcp_socket = NULL;
- GSocketAddress *rtp_sockaddr = NULL;
- GSocketAddress *rtcp_sockaddr = NULL;
- GError *error = NULL;
/* Check if it's a ipv4 or ipv6 transport */
inetaddr = g_inet_address_new_from_string (tr->destination);
@@ -2427,10 +2456,8 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
/* Select fields corresponding to the family */
if (family == G_SOCKET_FAMILY_IPV4) {
mcast_addr = priv->mcast_addr_v4;
- mcast_udpsrc = priv->mcast_udpsrc_v4;
} else {
mcast_addr = priv->mcast_addr_v6;
- mcast_udpsrc = priv->mcast_udpsrc_v6;
}
/* We support only one mcast group per family, make sure this transport
@@ -2444,90 +2471,6 @@ create_mcast_part_for_transport (GstRTSPStream * stream,
tr->ttl != mcast_addr->ttl)
goto wrong_addr;
- if (mcast_udpsrc[0]) {
- /* We already created elements for this family. Since we support only one
- * mcast group per family, there is nothing more to do here. */
- g_assert (mcast_udpsrc[1]);
- g_assert (priv->mcast_udpqueue[0]);
- g_assert (priv->mcast_udpqueue[1]);
- g_assert (priv->mcast_udpsink[0]);
- g_assert (priv->mcast_udpsink[1]);
- return TRUE;
- }
-
- g_assert (!mcast_udpsrc[1]);
-
- /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
- rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
- G_SOCKET_PROTOCOL_UDP, &error);
- if (!rtp_socket)
- goto socket_error;
- g_socket_set_multicast_loopback (rtp_socket, FALSE);
-
- rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
- G_SOCKET_PROTOCOL_UDP, &error);
- if (!rtcp_socket)
- goto socket_error;
- g_socket_set_multicast_loopback (rtcp_socket, FALSE);
-
- inetaddr = g_inet_address_new_any (family);
- rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
- rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
- g_object_unref (inetaddr);
-
- if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
- goto socket_error;
-
- if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
- goto socket_error;
-
- g_object_unref (rtp_sockaddr);
- g_object_unref (rtcp_sockaddr);
-
- /* Add receiver part */
- create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
- if (priv->sinkpad) {
- plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
- gst_element_sync_state_with_parent (mcast_udpsrc[0]);
- }
- plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
- gst_element_sync_state_with_parent (mcast_udpsrc[1]);
-
- /* Add sender part, could already have been created for the other family. */
- if (!priv->mcast_udpsink[0]) {
- g_assert (!priv->mcast_udpsink[1]);
- g_assert (!priv->mcast_udpqueue[0]);
- g_assert (!priv->mcast_udpqueue[1]);
-
- create_and_configure_udpsinks (stream, priv->mcast_udpsink);
-
- g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address,
- mcast_addr->port, NULL);
- g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address,
- mcast_addr->port + 1, NULL);
-
- set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
- family);
-
- if (priv->srcpad) {
- plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
- &priv->mcast_udpqueue[0]);
- gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
- gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
- }
- plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
- &priv->mcast_udpqueue[1]);
- gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
- gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
- } else {
- g_assert (priv->mcast_udpsink[1]);
- g_assert (priv->mcast_udpqueue[0]);
- g_assert (priv->mcast_udpqueue[1]);
-
- set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
- family);
- }
-
return TRUE;
no_addr:
@@ -2542,17 +2485,6 @@ wrong_addr:
"the reserved address");
return FALSE;
}
-socket_error:
- {
- GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
- error->message);
- g_clear_object (&rtp_socket);
- g_clear_object (&rtcp_socket);
- g_clear_object (&rtp_sockaddr);
- g_clear_object (&rtcp_sockaddr);
- g_clear_error (&error);
- return FALSE;
- }
}
/**
@@ -2817,6 +2749,19 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
if (priv->srtpdec)
gst_object_unref (priv->srtpdec);
+ if (priv->mcast_addr_v4)
+ gst_rtsp_address_free (priv->mcast_addr_v4);
+ priv->mcast_addr_v4 = NULL;
+ if (priv->mcast_addr_v6)
+ gst_rtsp_address_free (priv->mcast_addr_v6);
+ priv->mcast_addr_v6 = NULL;
+ if (priv->server_addr_v4)
+ gst_rtsp_address_free (priv->server_addr_v4);
+ priv->server_addr_v4 = NULL;
+ if (priv->server_addr_v6)
+ gst_rtsp_address_free (priv->server_addr_v6);
+ priv->server_addr_v6 = NULL;
+
g_clear_object (&priv->joined_bin);
g_mutex_unlock (&priv->lock);
@@ -3151,13 +3096,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
case GST_RTSP_LOWER_TRANS_UDP_MCAST:
{
if (add) {
- if (!create_mcast_part_for_transport (stream, tr))
+ if (!check_mcast_part_for_transport (stream, tr))
goto mcast_error;
priv->transports = g_list_prepend (priv->transports, trans);
} else {
priv->transports = g_list_remove (priv->transports, trans);
- /* FIXME: Check if there are remaining mcast transports, and destroy
- * mcast part if its now unused */
}
break;
}