diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2016-09-05 18:04:50 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2016-10-25 15:53:49 +0300 |
commit | 8084e1a9845014d1e00d4f9aacb371d52a5dc98b (patch) | |
tree | f9302ffdb3acf136349924933149bfa877020e35 | |
parent | 0befc7ea9acbed2701a5adc65e5bb818b06b5830 (diff) |
rtsp-stream: Always create multicast UDP elements if the protocol flag is set
Adding them later will cause deadlocks due to
1) pre-rolling and staying in PAUSED with the unicast/TCP sinks
2) adding the multicast sink
3) waiting for it to get data to preroll again
3) never happens because the queues after the tee are full.
-rw-r--r-- | gst/rtsp-server/rtsp-stream.c | 215 |
1 files changed, 79 insertions, 136 deletions
diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 132d8bd..2a88c29 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -124,8 +124,6 @@ struct _GstRTSPStreamPrivate GstRTSPAddressPool *pool; /* unicast server addr/port */ - GstRTSPRange server_port_v4; - GstRTSPRange server_port_v6; GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v6; @@ -1136,7 +1134,7 @@ error: static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, GstElement * udpsrc_out[2], GstElement * udpsink_out[2], - GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out) + GstRTSPAddress ** server_addr_out, gboolean multicast) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; @@ -1182,13 +1180,21 @@ again: g_socket_set_multicast_loopback (rtp_socket, FALSE); } - if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) { + if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) { GstRTSPAddressFlags flags; if (addr) rejected_addresses = g_list_prepend (rejected_addresses, addr); - flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST; + if (!pool) + goto no_ports; + + flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; + if (multicast) + flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST; + else + flags |= GST_RTSP_ADDRESS_FLAG_UNICAST; + if (family == G_SOCKET_FAMILY_IPV6) flags |= GST_RTSP_ADDRESS_FLAG_IPV6; else @@ -1250,21 +1256,20 @@ again: } g_object_unref (rtcp_sockaddr); - if (addr == NULL) - addr_str = g_inet_address_to_string (inetaddr); - else - addr_str = addr->address; + if (!addr) { + addr = g_slice_new0 (GstRTSPAddress); + addr->address = g_inet_address_to_string (inetaddr); + addr->port = tmp_rtp; + addr->n_ports = 2; + } + + addr_str = addr->address; g_clear_object (&inetaddr); if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { - if (addr == NULL) - g_free (addr_str); goto no_udp_protocol; } - if (addr == NULL) - g_free (addr_str); - g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); @@ -1272,18 +1277,21 @@ again: if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) goto port_error; - server_port_out->min = rtpport; - server_port_out->max = rtcpport; - /* This function is called twice (for v4 and v6) but we create only one pair * of udpsinks. */ if (!udpsink_out[0] && !create_and_configure_udpsinks (stream, udpsink_out)) goto no_udp_protocol; + if (multicast) { + g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL); + g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL); + } + set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); *server_addr_out = addr; + g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); g_object_unref (rtp_socket); @@ -1398,15 +1406,21 @@ alloc_ports (GstRTSPStream * stream) GstRTSPStreamPrivate *priv = stream->priv; gboolean ret = TRUE; - if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || - (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) { + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) { ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->udpsrc_v4, priv->udpsink, - &priv->server_port_v4, &priv->server_addr_v4); + priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE); + + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE); + } + + /* FIXME: Maybe actually consider the return values? */ + if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) { + ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE); ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->udpsrc_v6, priv->udpsink, - &priv->server_port_v6, &priv->server_addr_v6); + priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE); } return ret; @@ -1433,11 +1447,17 @@ gst_rtsp_stream_get_server_port (GstRTSPStream * stream, g_mutex_lock (&priv->lock); if (family == G_SOCKET_FAMILY_IPV4) { - if (server_port) - *server_port = priv->server_port_v4; + if (server_port) { + server_port->min = priv->server_addr_v4->port; + server_port->max = + priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1; + } } else { - if (server_port) - *server_port = priv->server_port_v6; + if (server_port) { + server_port->min = priv->server_addr_v6->port; + server_port->max = + priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1; + } } g_mutex_unlock (&priv->lock); } @@ -2220,7 +2240,7 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp = FALSE, is_udp = FALSE; + gboolean is_tcp, is_udp; gint i; priv = stream->priv; @@ -2277,7 +2297,12 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); + if (priv->udpsink[i]) + plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); + + if (priv->mcast_udpsink[i]) + plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i], + &priv->mcast_udpqueue[i]); if (is_tcp) { g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); @@ -2300,12 +2325,16 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (state != GST_STATE_NULL) { if (priv->udpsink[i]) gst_element_set_state (priv->udpsink[i], state); + if (priv->mcast_udpsink[i]) + gst_element_set_state (priv->mcast_udpsink[i], state); if (priv->appsink[i]) gst_element_set_state (priv->appsink[i], state); if (priv->appqueue[i]) gst_element_set_state (priv->appqueue[i], state); if (priv->udpqueue[i]) gst_element_set_state (priv->udpqueue[i], state); + if (priv->mcast_udpqueue[i]) + gst_element_set_state (priv->mcast_udpqueue[i], state); if (priv->tee[i]) gst_element_set_state (priv->tee[i], state); } @@ -2388,6 +2417,12 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (priv->udpsrc_v6[i]) plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); + if (priv->mcast_udpsrc_v4[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); + + if (priv->mcast_udpsrc_v6[i]) + plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); + if (is_tcp) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); @@ -2405,19 +2440,13 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) } static gboolean -create_mcast_part_for_transport (GstRTSPStream * stream, +check_mcast_part_for_transport (GstRTSPStream * stream, const GstRTSPTransport * tr) { GstRTSPStreamPrivate *priv = stream->priv; GInetAddress *inetaddr; GSocketFamily family; GstRTSPAddress *mcast_addr; - GstElement **mcast_udpsrc; - GSocket *rtp_socket = NULL; - GSocket *rtcp_socket = NULL; - GSocketAddress *rtp_sockaddr = NULL; - GSocketAddress *rtcp_sockaddr = NULL; - GError *error = NULL; /* Check if it's a ipv4 or ipv6 transport */ inetaddr = g_inet_address_new_from_string (tr->destination); @@ -2427,10 +2456,8 @@ create_mcast_part_for_transport (GstRTSPStream * stream, /* Select fields corresponding to the family */ if (family == G_SOCKET_FAMILY_IPV4) { mcast_addr = priv->mcast_addr_v4; - mcast_udpsrc = priv->mcast_udpsrc_v4; } else { mcast_addr = priv->mcast_addr_v6; - mcast_udpsrc = priv->mcast_udpsrc_v6; } /* We support only one mcast group per family, make sure this transport @@ -2444,90 +2471,6 @@ create_mcast_part_for_transport (GstRTSPStream * stream, tr->ttl != mcast_addr->ttl) goto wrong_addr; - if (mcast_udpsrc[0]) { - /* We already created elements for this family. Since we support only one - * mcast group per family, there is nothing more to do here. */ - g_assert (mcast_udpsrc[1]); - g_assert (priv->mcast_udpqueue[0]); - g_assert (priv->mcast_udpqueue[1]); - g_assert (priv->mcast_udpsink[0]); - g_assert (priv->mcast_udpsink[1]); - return TRUE; - } - - g_assert (!mcast_udpsrc[1]); - - /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */ - rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, - G_SOCKET_PROTOCOL_UDP, &error); - if (!rtp_socket) - goto socket_error; - g_socket_set_multicast_loopback (rtp_socket, FALSE); - - rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, - G_SOCKET_PROTOCOL_UDP, &error); - if (!rtcp_socket) - goto socket_error; - g_socket_set_multicast_loopback (rtcp_socket, FALSE); - - inetaddr = g_inet_address_new_any (family); - rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port); - rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1); - g_object_unref (inetaddr); - - if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error)) - goto socket_error; - - if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error)) - goto socket_error; - - g_object_unref (rtp_sockaddr); - g_object_unref (rtcp_sockaddr); - - /* Add receiver part */ - create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket); - if (priv->sinkpad) { - plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]); - gst_element_sync_state_with_parent (mcast_udpsrc[0]); - } - plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]); - gst_element_sync_state_with_parent (mcast_udpsrc[1]); - - /* Add sender part, could already have been created for the other family. */ - if (!priv->mcast_udpsink[0]) { - g_assert (!priv->mcast_udpsink[1]); - g_assert (!priv->mcast_udpqueue[0]); - g_assert (!priv->mcast_udpqueue[1]); - - create_and_configure_udpsinks (stream, priv->mcast_udpsink); - - g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address, - mcast_addr->port, NULL); - g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address, - mcast_addr->port + 1, NULL); - - set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, - family); - - if (priv->srcpad) { - plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0], - &priv->mcast_udpqueue[0]); - gst_element_sync_state_with_parent (priv->mcast_udpsink[0]); - gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]); - } - plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1], - &priv->mcast_udpqueue[1]); - gst_element_sync_state_with_parent (priv->mcast_udpsink[1]); - gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]); - } else { - g_assert (priv->mcast_udpsink[1]); - g_assert (priv->mcast_udpqueue[0]); - g_assert (priv->mcast_udpqueue[1]); - - set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket, - family); - } - return TRUE; no_addr: @@ -2542,17 +2485,6 @@ wrong_addr: "the reserved address"); return FALSE; } -socket_error: - { - GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s", - error->message); - g_clear_object (&rtp_socket); - g_clear_object (&rtcp_socket); - g_clear_object (&rtp_sockaddr); - g_clear_object (&rtcp_sockaddr); - g_clear_error (&error); - return FALSE; - } } /** @@ -2817,6 +2749,19 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, if (priv->srtpdec) gst_object_unref (priv->srtpdec); + if (priv->mcast_addr_v4) + gst_rtsp_address_free (priv->mcast_addr_v4); + priv->mcast_addr_v4 = NULL; + if (priv->mcast_addr_v6) + gst_rtsp_address_free (priv->mcast_addr_v6); + priv->mcast_addr_v6 = NULL; + if (priv->server_addr_v4) + gst_rtsp_address_free (priv->server_addr_v4); + priv->server_addr_v4 = NULL; + if (priv->server_addr_v6) + gst_rtsp_address_free (priv->server_addr_v6); + priv->server_addr_v6 = NULL; + g_clear_object (&priv->joined_bin); g_mutex_unlock (&priv->lock); @@ -3151,13 +3096,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, case GST_RTSP_LOWER_TRANS_UDP_MCAST: { if (add) { - if (!create_mcast_part_for_transport (stream, tr)) + if (!check_mcast_part_for_transport (stream, tr)) goto mcast_error; priv->transports = g_list_prepend (priv->transports, trans); } else { priv->transports = g_list_remove (priv->transports, trans); - /* FIXME: Check if there are remaining mcast transports, and destroy - * mcast part if its now unused */ } break; } |