diff options
author | Patricia Muscalu <patricia@axis.com> | 2017-10-17 10:44:33 +0200 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2017-11-15 19:56:15 +0200 |
commit | a7732a68e8bc6b4ba15629c652056c240c624ff0 (patch) | |
tree | b22bd110efd03bd22554939af4b60a1bd242e340 | |
parent | 930a602e17e4bfdf55522011d3d132bc6777452b (diff) |
Dynamically reconfigure pipeline in PLAY based on transports
The initial pipeline does not contain specific transport
elements. The receiver and the sender parts are added
after PLAY.
If the media is shared, the streams are dynamically
reconfigured after each PLAY.
https://bugzilla.gnome.org/show_bug.cgi?id=788340
-rw-r--r-- | gst/rtsp-server/rtsp-client.c | 114 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-media.c | 109 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-media.h | 3 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.c | 1008 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.h | 9 | ||||
-rw-r--r-- | tests/check/gst/client.c | 3 | ||||
-rw-r--r-- | tests/check/gst/media.c | 103 | ||||
-rw-r--r-- | tests/check/gst/rtspserver.c | 259 | ||||
-rw-r--r-- | tests/check/gst/stream.c | 75 |
9 files changed, 1340 insertions, 343 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 45d178a..83131f3 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -1518,6 +1518,7 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) gint matched; gchar *seek_style = NULL; GstRTSPStatusCode sig_result; + GPtrArray *transports; if (!(session = ctx->session)) goto no_session; @@ -1556,6 +1557,14 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) goto invalid_state; + /* update the pipeline */ + transports = gst_rtsp_session_media_get_transports (sessmedia); + if (!gst_rtsp_media_complete_pipeline (media, transports)) { + g_ptr_array_unref (transports); + goto pipeline_error; + } + g_ptr_array_unref (transports); + /* in play we first unsuspend, media could be suspended from SDP or PAUSED */ if (!gst_rtsp_media_unsuspend (media)) goto unsuspend_failed; @@ -1666,6 +1675,13 @@ invalid_state: ctx); return FALSE; } +pipeline_error: + { + GST_ERROR ("client %p: failed to configure the pipeline", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + ctx); + return FALSE; + } unsuspend_failed: { GST_ERROR ("client %p: unsuspend failed", client); @@ -1784,39 +1800,52 @@ default_configure_client_transport (GstRTSPClient * client, GstRTSPClientPrivate *priv = client->priv; /* we have a valid transport now, set the destination of the client. */ - if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - gboolean use_client_settings; - - use_client_settings = - gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS); - - if (ct->destination && use_client_settings) { - GstRTSPAddress *addr; - - addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination, - ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl); - - if (addr == NULL) - goto no_address; - - gst_rtsp_address_free (addr); + if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST || + ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP) { + + /* allocate UDP ports */ + GSocketFamily family; + gboolean use_client_settings = FALSE; + + family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; + if ((ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) && + gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS) && + (ct->destination != NULL)) + use_client_settings = TRUE; + + if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, + use_client_settings)) + goto error_allocating_ports; + + if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { + GstRTSPAddress *addr = NULL; + + if (use_client_settings) { + /* the address has been successfully allocated, let's check if it's + * the one requested by the client */ + addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination, + ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl); + + if (addr == NULL) + goto no_address; + } else { + g_free (ct->destination); + addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family); + if (addr == NULL) + goto no_address; + ct->destination = g_strdup (addr->address); + ct->port.min = addr->port; + ct->port.max = addr->port + addr->n_ports - 1; + ct->ttl = addr->ttl; + + gst_rtsp_address_free (addr); + } } else { - GstRTSPAddress *addr; - GSocketFamily family; - - family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; - - addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family); - if (addr == NULL) - goto no_address; + GstRTSPUrl *url; + url = gst_rtsp_connection_get_url (priv->connection); g_free (ct->destination); - ct->destination = g_strdup (addr->address); - ct->port.min = addr->port; - ct->port.max = addr->port + addr->n_ports - 1; - ct->ttl = addr->ttl; - - gst_rtsp_address_free (addr); + ct->destination = g_strdup (url->host); } } else { GstRTSPUrl *url; @@ -1863,9 +1892,14 @@ default_configure_client_transport (GstRTSPClient * client, return TRUE; /* ERRORS */ +error_allocating_ports: + { + GST_ERROR_OBJECT (client, "Failed to allocate UDP ports"); + return FALSE; + } no_address: { - GST_ERROR_OBJECT (client, "failed to acquire address for stream"); + GST_ERROR_OBJECT (client, "Failed to acquire address for stream"); return FALSE; } } @@ -3036,6 +3070,7 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx) gchar *path; gint matched; GstRTSPStatusCode sig_result; + GPtrArray *transports; if (!(session = ctx->session)) goto no_session; @@ -3074,7 +3109,15 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx) if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) goto invalid_state; - /* in play we first unsuspend, media could be suspended from SDP or PAUSED */ + /* update the pipeline */ + transports = gst_rtsp_session_media_get_transports (sessmedia); + if (!gst_rtsp_media_complete_pipeline (media, transports)) { + g_ptr_array_unref (transports); + goto pipeline_error; + } + g_ptr_array_unref (transports); + + /* in record we first unsuspend, media could be suspended from SDP or PAUSED */ if (!gst_rtsp_media_unsuspend (media)) goto unsuspend_failed; @@ -3140,6 +3183,13 @@ invalid_state: ctx); return FALSE; } +pipeline_error: + { + GST_ERROR ("client %p: failed to configure the pipeline", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + ctx); + return FALSE; + } unsuspend_failed: { GST_ERROR ("client %p: unsuspend failed", client); diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index d3a1f5a..159dd3d 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -2115,6 +2115,21 @@ media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked) } static void +stream_unblock (GstRTSPStream * stream, GstRTSPMedia * media) +{ + gst_rtsp_stream_unblock_linked (stream); +} + +static void +media_unblock_linked (GstRTSPMedia * media) +{ + GstRTSPMediaPrivate *priv = media->priv; + + GST_DEBUG ("media %p unblocking linked streams", media); + g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media); +} + +static void gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status) { GstRTSPMediaPrivate *priv = media->priv; @@ -2526,8 +2541,6 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message) GST_INFO ("%p: ignoring ASYNC_DONE", media); } else { GST_INFO ("%p: got ASYNC_DONE", media); - collect_media_stats (media); - if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING) gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); } @@ -2642,6 +2655,9 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media) GST_WARNING ("failed to join bin element"); } + if (priv->blocked) + gst_rtsp_stream_set_blocked (stream, TRUE); + priv->adding = FALSE; g_rec_mutex_unlock (&priv->state_lock); @@ -2720,7 +2736,9 @@ start_preroll (GstRTSPMedia * media) GstStateChangeReturn ret; GST_INFO ("setting pipeline to PAUSED for media %p", media); - /* first go to PAUSED */ + + /* start blocked since it is possible that there are no sink elements yet */ + media_streams_set_blocked (media, TRUE); ret = set_target_state (media, GST_STATE_PAUSED, TRUE); switch (ret) { @@ -2737,10 +2755,7 @@ start_preroll (GstRTSPMedia * media) * seeking query in preroll instead */ priv->seekable = -1; priv->is_live = TRUE; - if (!(priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) { - /* start blocked to make sure nothing goes to the sink */ - media_streams_set_blocked (media, TRUE); - } + ret = set_state (media, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) goto state_failed; @@ -3100,6 +3115,8 @@ finish_unprepare (GstRTSPMedia * media) set_state (media, GST_STATE_NULL); g_rec_mutex_lock (&priv->state_lock); + media_streams_set_blocked (media, FALSE); + if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING) return; @@ -3209,8 +3226,6 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media) goto is_busy; GST_INFO ("unprepare media %p", media); - if (priv->blocked) - media_streams_set_blocked (media, FALSE); set_target_state (media, GST_STATE_NULL, FALSE); success = TRUE; @@ -3563,7 +3578,6 @@ default_suspend (GstRTSPMedia * media) { GstRTSPMediaPrivate *priv = media->priv; GstStateChangeReturn ret; - gboolean unblock = FALSE; switch (priv->suspend_mode) { case GST_RTSP_SUSPEND_MODE_NONE: @@ -3574,7 +3588,6 @@ default_suspend (GstRTSPMedia * media) ret = set_target_state (media, GST_STATE_PAUSED, TRUE); if (ret == GST_STATE_CHANGE_FAILURE) goto state_failed; - unblock = TRUE; break; case GST_RTSP_SUSPEND_MODE_RESET: GST_DEBUG ("media %p suspend to NULL", media); @@ -3587,16 +3600,11 @@ default_suspend (GstRTSPMedia * media) * is actually from NULL to PLAY will create a new sequence * number. */ g_ptr_array_foreach (priv->streams, (GFunc) do_set_seqnum, NULL); - unblock = TRUE; break; default: break; } - /* let the streams do the state changes freely, if any */ - if (unblock) - media_streams_set_blocked (media, FALSE); - return TRUE; /* ERRORS */ @@ -3674,7 +3682,19 @@ default_unsuspend (GstRTSPMedia * media) switch (priv->suspend_mode) { case GST_RTSP_SUSPEND_MODE_NONE: - gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); + if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) + break; + gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING); + /* at this point the media pipeline has been updated and contain all + * specific transport parts: all active streams contain at least one sink + * element and it's safe to unblock any blocked streams that are active */ + media_unblock_linked (media); + g_rec_mutex_unlock (&priv->state_lock); + if (gst_rtsp_media_get_status (media) == GST_RTSP_MEDIA_STATUS_ERROR) { + g_rec_mutex_lock (&priv->state_lock); + goto preroll_failed; + } + g_rec_mutex_lock (&priv->state_lock); break; case GST_RTSP_SUSPEND_MODE_PAUSE: gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); @@ -3682,6 +3702,10 @@ default_unsuspend (GstRTSPMedia * media) case GST_RTSP_SUSPEND_MODE_RESET: { gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING); + /* at this point the media pipeline has been updated and contain all + * specific transport parts: all active streams contain at least one sink + * element and it's safe to unblock any blocked streams that are active */ + media_unblock_linked (media); if (!start_preroll (media)) goto start_failed; @@ -3771,7 +3795,7 @@ media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state) } else { if (state == GST_STATE_PLAYING) /* make sure pads are not blocking anymore when going to PLAYING */ - media_streams_set_blocked (media, FALSE); + media_unblock_linked (media); set_state (media, state); @@ -4007,3 +4031,52 @@ gst_rtsp_media_seekable (GstRTSPMedia * media) * and no stream is seekable only to the beginning */ return media->priv->seekable; } + +/** + * gst_rtsp_media_complete_pipeline: + * @media: a #GstRTSPMedia + * @transports: a list of #GstRTSPTransport + * + * Add a receiver and sender parts to the pipeline based on the transport from + * SETUP. + * + * Returns: %TRUE if the media pipeline has been sucessfully updated. + */ +gboolean +gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports) +{ + GstRTSPMediaPrivate *priv; + guint i; + + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); + g_return_val_if_fail (transports, FALSE); + + GST_DEBUG_OBJECT (media, "complete pipeline"); + + priv = media->priv; + + g_mutex_lock (&priv->lock); + for (i = 0; i < priv->streams->len; i++) { + GstRTSPStreamTransport *transport; + GstRTSPStream *stream; + const GstRTSPTransport *rtsp_transport; + + transport = g_ptr_array_index (transports, i); + if (!transport) + continue; + + stream = gst_rtsp_stream_transport_get_stream (transport); + if (!stream) + continue; + + rtsp_transport = gst_rtsp_stream_transport_get_transport (transport); + + if (!gst_rtsp_stream_complete_stream (stream, rtsp_transport)) { + g_mutex_unlock (&priv->lock); + return FALSE; + } + } + g_mutex_unlock (&priv->lock); + + return TRUE; +} diff --git a/gst/rtsp-server/rtsp-media.h b/gst/rtsp-server/rtsp-media.h index 9c63d8f..d78265d 100644 --- a/gst/rtsp-server/rtsp-media.h +++ b/gst/rtsp-server/rtsp-media.h @@ -382,6 +382,9 @@ GST_EXPORT void gst_rtsp_media_set_pipeline_state (GstRTSPMedia * media, GstState state); +GST_EXPORT +gboolean gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports); + #ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTSPMedia, gst_object_unref) #endif diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index ea4f034..9a8f1c2 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -99,12 +99,16 @@ struct _GstRTSPStreamPrivate GstElement *udpsrc_v6[2]; GstElement *udpqueue[2]; GstElement *udpsink[2]; + GSocket *socket_v4[2]; + GSocket *socket_v6[2]; /* for UDP multicast */ GstElement *mcast_udpsrc_v4[2]; GstElement *mcast_udpsrc_v6[2]; GstElement *mcast_udpqueue[2]; GstElement *mcast_udpsink[2]; + GSocket *mcast_socket_v4[2]; + GSocket *mcast_socket_v6[2]; /* for TCP transport */ GstElement *appsrc[2]; @@ -291,6 +295,23 @@ gst_rtsp_stream_finalize (GObject * obj) if (priv->rtxsend) g_object_unref (priv->rtxsend); + if (priv->socket_v4[0]) + g_object_unref (priv->socket_v4[0]); + if (priv->socket_v4[1]) + g_object_unref (priv->socket_v4[1]); + if (priv->socket_v6[0]) + g_object_unref (priv->socket_v6[0]); + if (priv->socket_v6[1]) + g_object_unref (priv->socket_v6[1]); + if (priv->mcast_socket_v4[0]) + g_object_unref (priv->mcast_socket_v4[0]); + if (priv->mcast_socket_v4[1]) + g_object_unref (priv->mcast_socket_v4[1]); + if (priv->mcast_socket_v6[0]) + g_object_unref (priv->mcast_socket_v6[0]); + if (priv->mcast_socket_v6[1]) + g_object_unref (priv->mcast_socket_v6[1]); + g_free (priv->multicast_iface); gst_object_unref (priv->payloader); @@ -588,18 +609,14 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream) /* Update the dscp qos property on the udp sinks */ static void -update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2]) +update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink) { GstRTSPStreamPrivate *priv; priv = stream->priv; - if (udpsink[0]) { - g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL); - } - - if (udpsink[1]) { - g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL); + if (*udpsink) { + g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL); } } @@ -1087,8 +1104,8 @@ different_address: /* must be called with lock */ static void -set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, - GSocket * rtcp_socket, GSocketFamily family) +set_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family) { const gchar *multisink_socket; @@ -1097,95 +1114,172 @@ set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, else multisink_socket = "socket"; - g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL); - g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL); + g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL); +} + +/* must be called with lock */ +static void +set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family, const gchar * multicast_iface, + const gchar * addr_str, gint port) +{ + set_socket_for_udpsink (udpsink, socket, family); + + if (multicast_iface) { + g_object_set (G_OBJECT (udpsink), "multicast-iface", + multicast_iface, NULL); + } + + g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL); +} + + +/* must be called with lock */ +static void +set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family) +{ + set_socket_for_udpsink (udpsink, socket, family); +} + +static guint16 +get_port_from_socket (GSocket * socket) +{ + guint16 port; + GSocketAddress *sockaddr; + GError *err; + + GST_DEBUG ("socket: %p", socket); + sockaddr = g_socket_get_local_address (socket, &err); + if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) { + g_clear_object (&sockaddr); + GST_ERROR ("failed to get sockaddr: %s", err->message); + g_error_free (err); + return 0; + } + + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr)); + g_object_unref (sockaddr); + + return port; } + static gboolean -create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2]) +create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink, + GSocket *socket_v4, GSocket *socket_v6, gboolean multicast, gboolean is_rtp) { GstRTSPStreamPrivate *priv = stream->priv; - GstElement *udpsink0, *udpsink1; - udpsink0 = gst_element_factory_make ("multiudpsink", NULL); - udpsink1 = gst_element_factory_make ("multiudpsink", NULL); + *udpsink = gst_element_factory_make ("multiudpsink", NULL); - if (!udpsink0 || !udpsink1) + if (!*udpsink) goto no_udp_protocol; /* configure sinks */ - g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL); + g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL); + g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL); + if (is_rtp) + g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL); + else + g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); /* Needs to be async for RECORD streams, otherwise we will never go to * PLAYING because the sinks will wait for data while the udpsrc can't * provide data with timestamps in PAUSED. */ - if (priv->sinkpad) - g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); - - /* join multicast group when adding clients, so we'll start receiving from it. - * We cannot rely on the udpsrc to join the group since its socket is always a - * local unicast one. */ - g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL); - g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL); + if (!is_rtp || priv->sinkpad) + g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); + if (multicast) { + /* join multicast group when adding clients, so we'll start receiving from it. + * We cannot rely on the udpsrc to join the group since its socket is always a + * local unicast one. */ + g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL); - udpsink[0] = udpsink0; - udpsink[1] = udpsink1; + g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL); + } /* update the dscp qos field in the sinks */ update_dscp_qos (stream, udpsink); + if (priv->server_addr_v4) { + GST_DEBUG_OBJECT (stream, + "udp IPv4, configure udpsinks"); + set_unicast_socket_for_udpsink (*udpsink, socket_v4, + G_SOCKET_FAMILY_IPV4); + } + + if (priv->server_addr_v6) { + GST_DEBUG_OBJECT (stream, + "udp IPv6, configure udpsinks"); + set_unicast_socket_for_udpsink (*udpsink, socket_v6, + G_SOCKET_FAMILY_IPV6); + } + + if (multicast) { + gint port; + if (priv->mcast_addr_v4) { + GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks"); + port = get_port_from_socket (socket_v4); + if (!port) + goto get_port_failed; + set_multicast_socket_for_udpsink (*udpsink, socket_v4, + G_SOCKET_FAMILY_IPV4, priv->multicast_iface, priv->mcast_addr_v4->address, port); + } + + if (priv->mcast_addr_v6) { + GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks"); + port = get_port_from_socket (socket_v6); + if (!port) + goto get_port_failed; + set_multicast_socket_for_udpsink (*udpsink, socket_v6, + G_SOCKET_FAMILY_IPV6, priv->multicast_iface, priv->mcast_addr_v6->address, port); + } + + } + return TRUE; /* ERRORS */ no_udp_protocol: { + GST_ERROR_OBJECT (stream, "failed to create udpsink element"); + return FALSE; + } +get_port_failed: + { + GST_ERROR_OBJECT (stream, "failed to get udp port"); return FALSE; } } /* must be called with lock */ static gboolean -create_and_configure_udpsources (GstElement * udpsrc_out[2], - GSocket * rtp_socket, GSocket * rtcp_socket) +create_and_configure_udpsource (GstElement ** udpsrc, + GSocket * socket) { GstStateChangeReturn ret; - udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL); - udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL); + g_assert (socket != NULL); - if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL) + *udpsrc = gst_element_factory_make ("udpsrc", NULL); + if (*udpsrc == NULL) goto error; - g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL); + g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL); /* The udpsrc cannot do the join because its socket is always a local unicast * one. The udpsink sharing the same socket will do it for us. */ - g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "close-socket", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL); - ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY); - if (ret == GST_STATE_CHANGE_FAILURE) - goto error; - ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY); + ret = gst_element_set_state (*udpsrc, GST_STATE_READY); if (ret == GST_STATE_CHANGE_FAILURE) goto error; @@ -1194,13 +1288,9 @@ create_and_configure_udpsources (GstElement * udpsrc_out[2], /* ERRORS */ error: { - if (udpsrc_out[0]) { - gst_element_set_state (udpsrc_out[0], GST_STATE_NULL); - g_clear_object (&udpsrc_out[0]); - } - if (udpsrc_out[1]) { - gst_element_set_state (udpsrc_out[1], GST_STATE_NULL); - g_clear_object (&udpsrc_out[1]); + if (*udpsrc) { + gst_element_set_state (*udpsrc, GST_STATE_NULL); + g_clear_object (udpsrc); } return FALSE; } @@ -1208,29 +1298,21 @@ error: static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, - GstElement * udpsrc_out[2], GstElement * udpsink_out[2], - GstRTSPAddress ** server_addr_out, gboolean multicast) + GSocket *socket_out[2], GstRTSPAddress ** server_addr_out, + gboolean multicast, GstRTSPTransport * ct) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; GSocket *rtcp_socket; gint tmp_rtp, tmp_rtcp; guint count; - gint rtpport, rtcpport; GList *rejected_addresses = NULL; GstRTSPAddress *addr = NULL; GInetAddress *inetaddr = NULL; - gchar *addr_str; GSocketAddress *rtp_sockaddr = NULL; GSocketAddress *rtcp_sockaddr = NULL; GstRTSPAddressPool *pool; - g_assert (!udpsrc_out[0]); - g_assert (!udpsrc_out[1]); - g_assert ((!udpsink_out[0] && !udpsink_out[1]) || - (udpsink_out[0] && udpsink_out[1])); - g_assert (*server_addr_out == NULL); - pool = priv->pool; count = 0; @@ -1262,7 +1344,7 @@ again: rejected_addresses = g_list_prepend (rejected_addresses, addr); if (!pool) - goto no_ports; + goto no_pool; flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; if (multicast) @@ -1278,11 +1360,13 @@ again: addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2); if (addr == NULL) - goto no_ports; + goto no_address; tmp_rtp = addr->port; g_clear_object (&inetaddr); + /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and + * socket control message set in udpsrc? */ if (multicast) inetaddr = g_inet_address_new_any (family); else @@ -1300,6 +1384,7 @@ again: rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp); if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) { + GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again"); g_object_unref (rtp_sockaddr); goto again; } @@ -1328,6 +1413,7 @@ again: rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp); if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) { + GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again"); g_object_unref (rtcp_sockaddr); g_clear_object (&rtp_socket); goto again; @@ -1341,62 +1427,42 @@ again: addr->n_ports = 2; } - addr_str = addr->address; g_clear_object (&inetaddr); - if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { - goto no_udp_protocol; - } - - g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); - g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); - - /* this should not happen... */ - if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) - goto port_error; - - /* This function is called twice (for v4 and v6) but we create only one pair - * of udpsinks. */ - if (!udpsink_out[0] - && !create_and_configure_udpsinks (stream, udpsink_out)) - goto no_udp_protocol; - - if (multicast) { - g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface", - priv->multicast_iface, NULL); - g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface", - priv->multicast_iface, NULL); - - g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL); - g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL); - } - - set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); - + socket_out[0] = rtp_socket; + socket_out[1] = rtcp_socket; *server_addr_out = addr; - g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); + GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d", addr->address, tmp_rtp, tmp_rtcp); - g_object_unref (rtp_socket); - g_object_unref (rtcp_socket); + g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); return TRUE; /* ERRORS */ no_udp_protocol: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error"); goto cleanup; } -no_ports: +no_pool: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no address pool specified"); goto cleanup; } -port_error: +no_address: { + GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); + goto cleanup; + } +no_ports: + { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports"); goto cleanup; } socket_error: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error"); goto cleanup; } cleanup: @@ -1425,14 +1491,73 @@ cleanup: * Allocates RTP and RTCP ports. * * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated. - * Deprecated: This function shouldn't have been made public */ gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, - GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings) + GSocketFamily family, GstRTSPTransport * ct, + gboolean use_transport_settings) { - g_warn_if_reached (); - return FALSE; + GstRTSPStreamPrivate *priv; + gboolean ret = FALSE; + GstRTSPLowerTrans transport; + gboolean allocated = FALSE; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + g_return_val_if_fail (ct != NULL, FALSE); + priv = stream->priv; + + transport = ct->lower_transport; + + g_mutex_lock (&priv->lock); + + if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { + if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_addr_v4) + allocated = TRUE; + else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_addr_v6) + allocated = TRUE; + } else if (transport == GST_RTSP_LOWER_TRANS_UDP) { + if (family == G_SOCKET_FAMILY_IPV4 && priv->server_addr_v4) + allocated = TRUE; + else if (family == G_SOCKET_FAMILY_IPV6 && priv->server_addr_v6) + allocated = TRUE; + } + + if (allocated) { + g_mutex_unlock (&priv->lock); + return TRUE; + } + + if (family == G_SOCKET_FAMILY_IPV4) { + /* IPv4 */ + if (transport == GST_RTSP_LOWER_TRANS_UDP) { + /* UDP unicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->socket_v4, &priv->server_addr_v4, FALSE, ct); + } else { + /* multicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct); + } + } else { + /* IPv6 */ + if (transport == GST_RTSP_LOWER_TRANS_UDP) { + /* unicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->socket_v6, &priv->server_addr_v6, FALSE, ct); + + } else { + /* multicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct); + } + } + g_mutex_unlock (&priv->lock); + + return ret; } /** @@ -1483,33 +1608,6 @@ gst_rtsp_stream_is_client_side (GstRTSPStream * stream) return ret; } -/* must be called with lock */ -static gboolean -alloc_ports (GstRTSPStream * stream) -{ - GstRTSPStreamPrivate *priv = stream->priv; - gboolean ret = TRUE; - - if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) { - ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE); - - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE); - } - - /* FIXME: Maybe actually consider the return values? */ - if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) { - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE); - - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE); - } - - return ret; -} - /** * gst_rtsp_stream_get_server_port: * @stream: a #GstRTSPStream @@ -2365,71 +2463,357 @@ on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ()); } +typedef struct _ProbeData ProbeData; + +struct _ProbeData +{ + GstRTSPStream *stream; + /* existing sink, already linked to tee */ + GstElement *sink1; + /* new sink, about to be linked */ + GstElement *sink2; + /* new queue element, that will be linked to tee and sink1 */ + GstElement **queue1; + /* new queue element, that will be linked to tee and sink2 */ + GstElement **queue2; + GstPad *sink_pad; + GstPad *tee_pad; + guint index; +}; + static void -plug_sink (GstBin * bin, GstElement * tee, GstElement * sink, - GstElement ** queue_out) +free_cb_data (gpointer user_data) { - GstPad *pad; - GstPad *teepad; - GstPad *queuepad; + ProbeData *data = user_data; + + gst_object_unref (data->stream); + gst_object_unref (data->sink1); + gst_object_unref (data->sink2); + gst_object_unref (data->sink_pad); + gst_object_unref (data->tee_pad); + g_free (data); +} + - gst_bin_add (bin, sink); +static void +create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream, GstElement *tee, + GstElement *sink, GstElement ** queue) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstPad *tee_pad; + GstPad *queue_pad; + GstPad *sink_pad; - *queue_out = gst_element_factory_make ("queue", NULL); - g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0, + /* create queue for the new stream */ + *queue = gst_element_factory_make ("queue", NULL); + g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); - gst_bin_add (bin, *queue_out); + gst_bin_add (priv->joined_bin, *queue); /* link tee to queue */ - teepad = gst_element_get_request_pad (tee, "src_%u"); - pad = gst_element_get_static_pad (*queue_out, "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); + tee_pad = gst_element_get_request_pad (tee, "src_%u"); + queue_pad = gst_element_get_static_pad (*queue, "sink"); + gst_pad_link (tee_pad, queue_pad); + gst_object_unref (queue_pad); + gst_object_unref (tee_pad); /* link queue to sink */ - queuepad = gst_element_get_static_pad (*queue_out, "src"); - pad = gst_element_get_static_pad (sink, "sink"); - gst_pad_link (queuepad, pad); - gst_object_unref (queuepad); - gst_object_unref (pad); + queue_pad = gst_element_get_static_pad (*queue, "src"); + sink_pad = gst_element_get_static_pad (sink, "sink"); + gst_pad_link (queue_pad, sink_pad); + gst_object_unref (queue_pad); + gst_object_unref (sink_pad); + + gst_element_sync_state_with_parent (sink); + gst_element_sync_state_with_parent (*queue); +} + +static GstPadProbeReturn +create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad, + GstPadProbeInfo * info, gpointer user_data) +{ + GstRTSPStreamPrivate *priv; + ProbeData *data = user_data; + GstRTSPStream *stream; + GstElement **queue1; + GstElement **queue2; + GstPad *sink_pad; + GstPad *tee_pad; + GstPad *queue_pad; + guint index; + + stream = data->stream; + priv = stream->priv; + queue1 = data->queue1; + queue2 = data->queue2; + sink_pad = data->sink_pad; + tee_pad = data->tee_pad; + index = data->index; + + /* unlink tee and the existing sink: + * .-----. .---------. + * | tee | | sink1 | + * sink src->sink | + * '-----' '---------' + */ + g_assert (gst_pad_unlink (tee_pad, sink_pad)); + + /* add queue to the already existing stream */ + *queue1 = gst_element_factory_make ("queue", NULL); + g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0, + "max-size-time", G_GINT64_CONSTANT (0), NULL); + gst_bin_add (priv->joined_bin, *queue1); + + /* link tee, queue and sink: + * .-----. .---------. .---------. + * | tee | | queue1 | | sink1 | + * sink src->sink src->sink | + * '-----' '---------' '---------' + */ + queue_pad = gst_element_get_static_pad (*queue1, "sink"); + gst_pad_link (tee_pad, queue_pad); + gst_object_unref (queue_pad); + queue_pad = gst_element_get_static_pad (*queue1, "src"); + gst_pad_link (queue_pad, sink_pad); + gst_object_unref (queue_pad); + + gst_element_sync_state_with_parent (*queue1); + + /* create queue and link it to tee and the new sink */ + create_and_plug_queue_to_unlinked_stream (stream, + priv->tee[index], data->sink2, queue2); + + /* the final stream: + * + * .-----. .---------. .---------. + * | tee | | queue1 | | sink1 | + * sink src->sink src->sink | + * | | '---------' '---------' + * | | .---------. .---------. + * | | | queue2 | | sink2 | + * | src->sink src->sink | + * '-----' '---------' '---------' + */ + + return GST_PAD_PROBE_REMOVE; +} + +static void +create_and_plug_queue_to_linked_stream (GstRTSPStream * stream, GstElement * sink1, + GstElement * sink2, guint index, GstElement ** queue1, + GstElement ** queue2) +{ + ProbeData *data; + + data = g_new0 (ProbeData, 1); + data->stream = gst_object_ref (stream); + data->sink1 = gst_object_ref (sink1); + data->sink2 = gst_object_ref (sink2); + data->queue1 = queue1; + data->queue2 = queue2; + data->index = index; + + data->sink_pad = gst_element_get_static_pad (sink1, "sink"); + g_assert (data->sink_pad); + data->tee_pad = gst_pad_get_peer (data->sink_pad); + g_assert (data->tee_pad); + + gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE, + create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data); +} + +static void +plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug, + GstElement ** queue_to_plug, guint index, gboolean is_mcast) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstElement *existing_sink; + + if (is_mcast) + existing_sink = priv->udpsink[index]; + else + existing_sink = priv->mcast_udpsink[index]; + + GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp"); + + /* add sink to the bin */ + gst_bin_add (priv->joined_bin, sink_to_plug); + + if (priv->appsink[index] && existing_sink) { + + /* queues are already added for the existing stream, add one for + the newly added udp stream */ + create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index], + sink_to_plug, queue_to_plug); + + } else if (priv->appsink[index] || existing_sink) { + GstElement **queue; + GstElement *element; + + /* add queue to the already existing stream plus the newly created udp + stream */ + if (priv->appsink[index]) { + element = priv->appsink[index]; + queue = &priv->appqueue[index]; + } else { + element = existing_sink; + if (is_mcast) + queue = &priv->udpqueue[index]; + else + queue = &priv->mcast_udpqueue[index]; + } + + create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug, index, + queue, queue_to_plug); + + } else { + GstPad *tee_pad; + GstPad *sink_pad; + + GST_DEBUG_OBJECT (stream, "creating first stream"); + + /* no need to add queues */ + tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); + sink_pad = gst_element_get_static_pad (sink_to_plug, "sink"); + gst_pad_link (tee_pad, sink_pad); + gst_object_unref (tee_pad); + gst_object_unref (sink_pad); + } + + gst_element_sync_state_with_parent (sink_to_plug); } -/* must be called with lock */ static void -create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) +plug_tcp_sink (GstRTSPStream * stream, guint index) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + GST_DEBUG_OBJECT (stream, "plug tcp sink"); + + /* add sink to the bin */ + gst_bin_add (priv->joined_bin, priv->appsink[index]); + + if (priv->mcast_udpsink[index] && priv->udpsink[index]) { + + /* queues are already added for the existing stream, add one for + the newly added tcp stream */ + create_and_plug_queue_to_unlinked_stream (stream, + priv->tee[index], priv->appsink[index], &priv->appqueue[index]); + + } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) { + GstElement **queue; + GstElement *element; + + /* add queue to the already existing stream plus the newly created tcp + stream */ + if (priv->mcast_udpsink[index]) { + element = priv->mcast_udpsink[index]; + queue = &priv->mcast_udpqueue[index]; + } else { + element = priv->udpsink[index]; + queue = &priv->udpqueue[index]; + } + + create_and_plug_queue_to_linked_stream (stream, element, priv->appsink[index], index, + queue, &priv->appqueue[index]); + + } else { + GstPad *tee_pad; + GstPad *sink_pad; + + /* no need to add queues */ + tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); + sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink"); + gst_pad_link (tee_pad, sink_pad); + gst_object_unref (tee_pad); + gst_object_unref (sink_pad); + } + + gst_element_sync_state_with_parent (priv->appsink[index]); +} + +static void +plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport, + guint index) +{ + GstRTSPStreamPrivate *priv; + gboolean is_tcp, is_udp, is_mcast; + priv = stream->priv; + + is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; + + if (is_udp) + plug_udp_sink (stream, priv->udpsink[index], + &priv->udpqueue[index], index, FALSE); + + else if (is_mcast) + plug_udp_sink (stream, priv->mcast_udpsink[index], + &priv->mcast_udpqueue[index], index, TRUE); + + else if (is_tcp) + plug_tcp_sink (stream, index); +} + +/* must be called with lock */ +static gboolean +create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp, is_udp; + GstBin *bin; + gboolean is_tcp, is_udp, is_mcast; gint i; + GST_DEBUG_OBJECT (stream, "create sender part"); priv = stream->priv; + bin = priv->joined_bin; + + is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; - is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; - is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || - (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)); + GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d", is_tcp, is_udp, + is_mcast); + + if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) { + GST_WARNING_OBJECT (stream, "no sockets assigned for UDP"); + return FALSE; + } + + if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) { + GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast"); + return FALSE; + } for (i = 0; i < 2; i++) { + gboolean link_tee = FALSE; /* For the sender we create this bit of pipeline for both - * RTP and RTCP. Sync and preroll are enabled on udpsink so - * we need to add a queue before appsink and udpsink to make - * the pipeline not block. For the TCP case, we want to pump - * client as fast as possible anyway. This pipeline is used - * when both TCP and UDP are present. + * RTP and RTCP. + * Initially there will be only one active transport for + * the stream, so the pipeline will look like this: + * + * .--------. .-----. .---------. + * | rtpbin | | tee | | sink | + * | send->sink src->sink | + * '--------' '-----' '---------' + * + * For each new transport, the already existing branch will + * be reconfigured by adding a queue element: * * .--------. .-----. .---------. .---------. * | rtpbin | | tee | | queue | | udpsink | * | send->sink src->sink src->sink | * '--------' | | '---------' '---------' * | | .---------. .---------. + * | | | queue | | udpsink | + * | src->sink src->sink | + * | | '---------' '---------' + * | | .---------. .---------. * | | | queue | | appsink | * | src->sink src->sink | * '-----' '---------' '---------' - * - * When only UDP or only TCP is allowed, we skip the tee and queue - * and link the udpsink (for UDP) or appsink (for TCP) directly to - * the session. */ /* Only link the RTP send src if we're going to send RTP, link @@ -2437,71 +2821,49 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (!priv->srcpad && i == 0) continue; - if (is_tcp) { + if (!priv->tee[i]) { + /* make tee for RTP/RTCP */ + priv->tee[i] = gst_element_factory_make ("tee", NULL); + gst_bin_add (bin, priv->tee[i]); + link_tee = TRUE; + } + + if (is_udp && !priv->udpsink[i]) { + /* we create only one pair of udpsinks for IPv4 and IPv6 */ + create_and_configure_udpsink (stream, &priv->udpsink[i], priv->socket_v4[i], + priv->socket_v6[i], FALSE, (i == 0)); + plug_sink (stream, transport, i); + } else if (is_mcast && !priv->mcast_udpsink[i]) { + /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */ + create_and_configure_udpsink (stream, &priv->mcast_udpsink[i], + priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0)); + plug_sink (stream, transport, i); + } else if (is_tcp && !priv->appsink[i]) { /* make appsink */ priv->appsink[i] = gst_element_factory_make ("appsink", NULL); g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); + + /* we need to set sync and preroll to FALSE for the sink to avoid + * deadlock. This is only needed for sink sending RTCP data. */ + if (i == 1) + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, + NULL); + gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), &sink_cb, stream, NULL); + plug_sink (stream, transport, i); } - /* If we have udp always use a tee because we could have mcast clients - * requesting different ports, in which case we'll have to plug more - * udpsinks. */ - if (is_udp) { - /* make tee for RTP/RTCP */ - priv->tee[i] = gst_element_factory_make ("tee", NULL); - gst_bin_add (bin, priv->tee[i]); - + if (link_tee) { /* and link to rtpbin send pad */ + gst_element_sync_state_with_parent (priv->tee[i]); pad = gst_element_get_static_pad (priv->tee[i], "sink"); gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - - if (priv->udpsink[i]) - plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); - - if (priv->mcast_udpsink[i]) - plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i], - &priv->mcast_udpqueue[i]); - - if (is_tcp) { - if (i == 1) - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]); - } - } else if (is_tcp) { - /* only appsink needed, link it to the session */ - gst_bin_add (bin, priv->appsink[i]); - pad = gst_element_get_static_pad (priv->appsink[i], "sink"); - gst_pad_link (priv->send_src[i], pad); - gst_object_unref (pad); - - /* when its only TCP, we need to set sync and preroll to FALSE - * for the sink to avoid deadlock. And this is only needed for - * sink used for RTCP data, not the RTP data. */ - if (i == 1) - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - } - - /* check if we need to set to a special state */ - if (state != GST_STATE_NULL) { - if (priv->udpsink[i]) - gst_element_set_state (priv->udpsink[i], state); - if (priv->mcast_udpsink[i]) - gst_element_set_state (priv->mcast_udpsink[i], state); - if (priv->appsink[i]) - gst_element_set_state (priv->appsink[i], state); - if (priv->appqueue[i]) - gst_element_set_state (priv->appqueue[i], state); - if (priv->udpqueue[i]) - gst_element_set_state (priv->udpqueue[i], state); - if (priv->mcast_udpqueue[i]) - gst_element_set_state (priv->mcast_udpqueue[i], state); - if (priv->tee[i]) - gst_element_set_state (priv->tee[i], state); } } + + return TRUE; } /* must be called with lock */ @@ -2533,30 +2895,48 @@ plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src, } /* must be called with lock */ -static void -create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) +static gboolean +create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport * + transport) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp; + GstBin *bin; + gboolean tcp; + gboolean udp; + gboolean mcast; gint i; + GST_DEBUG_OBJECT (stream, "create receiver part"); priv = stream->priv; + bin = priv->joined_bin; - is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; + tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; for (i = 0; i < 2; i++) { /* For the receiver we create this bit of pipeline for both * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc * and it is all funneled into the rtpbin receive pad. * + * * .--------. .--------. .--------. * | udpsrc | | funnel | | rtpbin | - * | src->sink src->sink | + * | RTP src->sink src->sink | + * '--------' | | | | + * .--------. | | | | + * | appsrc | | | | | + * | RTP src->sink | | | + * '--------' '--------' | | + * | | + * .--------. .--------. | | + * | udpsrc | | funnel | | | + * | RTCP src->sink src->sink | * '--------' | | '--------' * .--------. | | * | appsrc | | | - * | src->sink | + * | RTCP src->sink | * '--------' '--------' */ @@ -2574,19 +2954,41 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); - if (priv->udpsrc_v4[i]) + if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) { + GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->udpsrc_v4[i], + priv->socket_v4[i])) + goto udpsrc_error; + plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]); + } + + if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) { + GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->udpsrc_v6[i], + priv->socket_v6[i])) + goto udpsrc_error; - if (priv->udpsrc_v6[i]) plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); + } - if (priv->mcast_udpsrc_v4[i]) + if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) { + GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i], + priv->mcast_socket_v4[i])) + goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); + } - if (priv->mcast_udpsrc_v6[i]) + if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) { + GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i], + priv->mcast_socket_v6[i])) + goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); + } - if (is_tcp) { + if (tcp && !priv->appsrc[i]) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); priv->appsrc_base_time[i] = -1; @@ -2595,11 +2997,14 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]); } - /* check if we need to set to a special state */ - if (state != GST_STATE_NULL) { - gst_element_set_state (priv->funnel[i], state); - } + gst_element_sync_state_with_parent (priv->funnel[i]); } + + return TRUE; + +mcast_udpsrc_error: +udpsrc_error: + return FALSE; } static gboolean @@ -2688,9 +3093,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, GST_INFO ("stream %p joining bin as session %u", stream, idx); - if (!alloc_ports (stream)) - goto no_ports; - if (priv->profiles & GST_RTSP_PROFILE_SAVP || priv->profiles & GST_RTSP_PROFILE_SAVPF) { /* For SRTP */ @@ -2727,7 +3129,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); g_free (name); } else { - /* Need to connect our sinkpad from here */ + /* RECORD case: need to connect our sinkpad from here */ g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); /* EOS */ g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); @@ -2766,9 +3168,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, g_signal_connect (priv->session, "on-sender-ssrc-active", (GCallback) on_sender_ssrc_active, stream); - create_sender_part (stream, bin, state); - create_receiver_part (stream, bin, state); - if (priv->srcpad) { /* be notified of caps changes */ priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", @@ -2777,6 +3176,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, } priv->joined_bin = bin; + GST_DEBUG_OBJECT (stream, "successfully joined bin"); g_mutex_unlock (&priv->lock); return TRUE; @@ -2787,12 +3187,6 @@ was_joined: g_mutex_unlock (&priv->lock); return TRUE; } -no_ports: - { - g_mutex_unlock (&priv->lock); - GST_WARNING ("failed to allocate ports %u", idx); - return FALSE; - } link_failed: { GST_WARNING ("failed to link stream %u", idx); @@ -3781,30 +4175,22 @@ pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) return GST_PAD_PROBE_OK; } -/** - * gst_rtsp_stream_set_blocked: - * @stream: a #GstRTSPStream - * @blocked: boolean indicating we should block or unblock - * - * Blocks or unblocks the dataflow on @stream. - * - * Returns: %TRUE on success - */ -gboolean -gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) +static void +set_blocked (GstRTSPStream * stream, gboolean blocked) { GstRTSPStreamPrivate *priv; int i; - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + GST_DEBUG_OBJECT (stream, "blocked: %d", blocked); priv = stream->priv; - g_mutex_lock (&priv->lock); if (blocked) { priv->blocking = FALSE; for (i = 0; i < 2; i++) { - if (priv->blocked_id[i] == 0) { + if (priv->blocked_id[i] != 0) + continue; + if (priv->send_src[i]) { priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i], GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking, @@ -3820,6 +4206,51 @@ gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) } priv->blocking = FALSE; } +} + +/** + * gst_rtsp_stream_set_blocked: + * @stream: a #GstRTSPStream + * @blocked: boolean indicating we should block or unblock + * + * Blocks or unblocks the dataflow on @stream. + * + * Returns: %TRUE on success + */ +gboolean +gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + g_mutex_lock (&priv->lock); + set_blocked (stream, blocked); + g_mutex_unlock (&priv->lock); + + return TRUE; +} + +/** + * gst_rtsp_stream_ublock_linked: + * @stream: a #GstRTSPStream + * + * Unblocks the dataflow on @stream if it is linked. + * + * Returns: %TRUE on success + */ +gboolean +gst_rtsp_stream_unblock_linked (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + g_mutex_lock (&priv->lock); + if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0])) + set_blocked (stream, FALSE); g_mutex_unlock (&priv->lock); return TRUE; @@ -4015,3 +4446,50 @@ gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop) return TRUE; } + +/** + * gst_rtsp_stream_complete_stream: + * @stream: a #GstRTSPStream + * @transport: a #GstRTSPTransport + * + * Add a receiver and sender part to the pipeline based on the transport from + * SETUP. + * + * Returns: %TRUE if the pipeline has been sucessfully updated. + */ +gboolean +gst_rtsp_stream_complete_stream (GstRTSPStream * stream, + const GstRTSPTransport * transport) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + GST_DEBUG_OBJECT (stream, "complete stream"); + + g_mutex_lock (&priv->lock); + + if (!(priv->protocols & transport->lower_transport)) + goto unallowed_transport; + + if (!create_receiver_part (stream, transport)) + goto create_receiver_error; + + /* in the RECORD case, we only add RTCP sender part */ + if (!create_sender_part (stream, transport)) + goto create_sender_error; + + g_mutex_unlock (&priv->lock); + + GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated"); + return TRUE; + +create_receiver_error: +create_sender_error: +unallowed_transport: + { + g_mutex_unlock (&priv->lock); + return FALSE; + } +} diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h index 7aa1aa2..add1a88 100644 --- a/gst/rtsp-server/rtsp-stream.h +++ b/gst/rtsp-server/rtsp-stream.h @@ -160,6 +160,10 @@ gboolean gst_rtsp_stream_set_blocked (GstRTSPStream * stream, GST_EXPORT gboolean gst_rtsp_stream_is_blocking (GstRTSPStream * stream); + +GST_EXPORT +gboolean gst_rtsp_stream_unblock_linked (GstRTSPStream * stream); + GST_EXPORT void gst_rtsp_stream_set_client_side (GstRTSPStream *stream, gboolean client_side); @@ -272,7 +276,7 @@ GstElement * gst_rtsp_stream_request_aux_sender (GstRTSPStream * st GST_EXPORT gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family, - GstRTSPTransport *transport, gboolean use_client_setttings); + GstRTSPTransport *transport, gboolean use_client_settings); GST_EXPORT void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream, GstRTSPPublishClockMode mode); @@ -280,6 +284,9 @@ void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * GST_EXPORT GstRTSPPublishClockMode gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream); +GST_EXPORT +gboolean gst_rtsp_stream_complete_stream (GstRTSPStream * stream, const GstRTSPTransport * transport); + /** * GstRTSPStreamTransportFilterFunc: * @stream: a #GstRTSPStream object diff --git a/tests/check/gst/client.c b/tests/check/gst/client.c index 7779664..4adf3d1 100644 --- a/tests/check/gst/client.c +++ b/tests/check/gst/client.c @@ -793,7 +793,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) g_object_unref (session_pool); - /* simple SETUP with a valid URI and multicast, but an invalid prt */ fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP, "rtsp://localhost/test/stream=0") == GST_RTSP_OK); @@ -813,7 +812,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) g_object_unref (session_pool); - /* simple SETUP with a valid URI and multicast, but an invalid ttl */ fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP, "rtsp://localhost/test/stream=0") == GST_RTSP_OK); @@ -832,7 +830,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) fail_unless (gst_rtsp_session_pool_get_n_sessions (session_pool) == 0); g_object_unref (session_pool); - teardown_client (client); g_object_unref (ctx.auth); gst_rtsp_token_unref (ctx.token); diff --git a/tests/check/gst/media.c b/tests/check/gst/media.c index b0ddb19..cf09814 100644 --- a/tests/check/gst/media.c +++ b/tests/check/gst/media.c @@ -21,7 +21,73 @@ #include <rtsp-media-factory.h> -GST_START_TEST (test_launch) +GST_START_TEST (test_media_seek) +{ + GstRTSPMediaFactory *factory; + GstRTSPMedia *media; + GstRTSPUrl *url; + GstRTSPStream *stream; + GstRTSPTimeRange *range; + gchar *str; + GstRTSPThreadPool *pool; + GstRTSPThread *thread; + GstRTSPTransport *transport; + + factory = gst_rtsp_media_factory_new (); + fail_if (gst_rtsp_media_factory_is_shared (factory)); + fail_unless (gst_rtsp_url_parse ("rtsp://localhost:8554/test", + &url) == GST_RTSP_OK); + + gst_rtsp_media_factory_set_launch (factory, + "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); + + media = gst_rtsp_media_factory_construct (factory, url); + fail_unless (GST_IS_RTSP_MEDIA (media)); + + fail_unless (gst_rtsp_media_n_streams (media) == 1); + + stream = gst_rtsp_media_get_stream (media, 0); + fail_unless (stream != NULL); + + pool = gst_rtsp_thread_pool_new (); + thread = gst_rtsp_thread_pool_get_thread (pool, + GST_RTSP_THREAD_TYPE_MEDIA, NULL); + + fail_unless (gst_rtsp_media_prepare (media, thread)); + + /* define transport */ + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = GST_RTSP_LOWER_TRANS_TCP; + + fail_unless (gst_rtsp_stream_complete_stream (stream, transport)); + + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); + fail_unless (gst_rtsp_range_parse ("npt=5.0-", &range) == GST_RTSP_OK); + + /* the media is seekable now */ + fail_unless (gst_rtsp_media_seek (media, range)); + + str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT); + fail_unless (g_str_equal (str, "npt=5-")); + + gst_rtsp_range_free (range); + g_free (str); + + fail_unless (gst_rtsp_media_unprepare (media)); + g_object_unref (media); + + gst_rtsp_url_free (url); + g_object_unref (factory); + + g_object_unref (pool); + + gst_rtsp_thread_pool_cleanup (); +} + +GST_END_TEST; + + +GST_START_TEST (test_media_seek_no_sinks) { GstRTSPMediaFactory *factory; GstRTSPMedia *media; @@ -70,15 +136,8 @@ GST_START_TEST (test_launch) fail_unless (g_str_equal (str, "npt=0-")); g_free (str); - fail_unless (gst_rtsp_media_seek (media, range)); - - str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT); - fail_unless (g_str_equal (str, "npt=5-")); - g_free (str); - - str = gst_rtsp_media_get_range_string (media, TRUE, GST_RTSP_RANGE_NPT); - fail_unless (g_str_equal (str, "npt=5-")); - g_free (str); + /* fails, need to be prepared and contain sink elements */ + fail_if (gst_rtsp_media_seek (media, range)); fail_unless (gst_rtsp_media_unprepare (media)); @@ -126,12 +185,13 @@ GST_START_TEST (test_media) GST_END_TEST; static void -test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) +test_prepare_reusable (const gchar * launch_line) { GstRTSPMediaFactory *factory; GstRTSPMedia *media; GstRTSPUrl *url; GstRTSPThread *thread; + GstRTSPThreadPool *pool; factory = gst_rtsp_media_factory_new (); fail_if (gst_rtsp_media_factory_is_shared (factory)); @@ -146,6 +206,7 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) g_object_set (G_OBJECT (media), "reusable", TRUE, NULL); + pool = gst_rtsp_thread_pool_new (); thread = gst_rtsp_thread_pool_get_thread (pool, GST_RTSP_THREAD_TYPE_MEDIA, NULL); fail_unless (gst_rtsp_media_prepare (media, thread)); @@ -162,6 +223,17 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) g_object_unref (factory); } +GST_START_TEST (test_media_reusable) +{ + + /* test reusable media */ + test_prepare_reusable ("( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); + test_prepare_reusable ( + "( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )"); +} + +GST_END_TEST; + GST_START_TEST (test_media_prepare) { GstRTSPMediaFactory *factory; @@ -199,11 +271,6 @@ GST_START_TEST (test_media_prepare) gst_rtsp_url_free (url); g_object_unref (factory); - /* test reusable media */ - test_prepare_reusable (pool, "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); - test_prepare_reusable (pool, - "( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )"); - g_object_unref (pool); gst_rtsp_thread_pool_cleanup (); } @@ -454,9 +521,11 @@ rtspmedia_suite (void) suite_add_tcase (s, tc); tcase_set_timeout (tc, 20); - tcase_add_test (tc, test_launch); + tcase_add_test (tc, test_media_seek); + tcase_add_test (tc, test_media_seek_no_sinks); tcase_add_test (tc, test_media); tcase_add_test (tc, test_media_prepare); + tcase_add_test (tc, test_media_reusable); tcase_add_test (tc, test_media_dyn_prepare); tcase_add_test (tc, test_media_take_pipeline); tcase_add_test (tc, test_media_reset); diff --git a/tests/check/gst/rtspserver.c b/tests/check/gst/rtspserver.c index 7f636f5..7ed1516 100644 --- a/tests/check/gst/rtspserver.c +++ b/tests/check/gst/rtspserver.c @@ -1059,6 +1059,74 @@ done: } static void +do_test_play_tcp_full (const gchar * range) +{ + GstRTSPConnection *conn; + GstSDPMessage *sdp_message = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + const gchar *audio_control; + GstRTSPRange client_port; + gchar *session = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPTransport *audio_transport = NULL; + gchar *range_out = NULL; + GstRTSPLowerTrans lower_transport = GST_RTSP_LOWER_TRANS_TCP; + + conn = connect_to_server (test_port, TEST_MOUNT_POINT); + + sdp_message = do_describe (conn, TEST_MOUNT_POINT); + get_client_ports (&client_port); + + /* get control strings from DESCRIBE response */ + fail_unless (gst_sdp_message_medias_len (sdp_message) == 2); + sdp_media = gst_sdp_message_get_media (sdp_message, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn, video_control, lower_transport, + &client_port, NULL, &session, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn, audio_control, lower_transport, + &client_port, NULL, &session, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn, GST_RTSP_PLAY, NULL, session, NULL, range, + NULL, NULL, NULL, NULL, NULL, &range_out) == GST_RTSP_STS_OK); + + if (range) + fail_unless_equals_string (range, range_out); + g_free (range_out); + + { + GstRTSPMessage *message; + fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK); + fail_unless (gst_rtsp_connection_receive (conn, message, NULL) == GST_RTSP_OK); + fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA); + gst_rtsp_message_free (message); + } + + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN, + session) == GST_RTSP_STS_OK); + + /* FIXME: The rtsp-server always disconnects the transport before + * sending the RTCP BYE + * receive_rtcp (rtcp_socket, NULL, GST_RTCP_TYPE_BYE); + */ + + /* clean up and iterate so the clean-up can finish */ + g_free (session); + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + gst_sdp_message_free (sdp_message); + gst_rtsp_connection_free (conn); +} + +static void do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport, GMutex * lock) { @@ -1579,6 +1647,70 @@ GST_START_TEST (test_no_session_timeout) GST_END_TEST; +/* media contains two streams: video and audio but only one + * stream is requested */ +GST_START_TEST (test_play_one_active_stream) +{ + GstRTSPConnection *conn; + GstSDPMessage *sdp_message = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + GstRTSPRange client_port; + gchar *session = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPSessionPool *pool; + GstRTSPThreadPool *thread_pool; + + thread_pool = gst_rtsp_server_get_thread_pool (server); + gst_rtsp_thread_pool_set_max_threads (thread_pool, 2); + g_object_unref (thread_pool); + + pool = gst_rtsp_server_get_session_pool (server); + g_signal_connect (server, "client-connected", + G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one); + + start_server (FALSE); + + conn = connect_to_server (test_port, TEST_MOUNT_POINT); + + gst_rtsp_connection_set_remember_session_id (conn, FALSE); + + sdp_message = do_describe (conn, TEST_MOUNT_POINT); + + /* get control strings from DESCRIBE response */ + fail_unless (gst_sdp_message_medias_len (sdp_message) == 2); + sdp_media = gst_sdp_message_get_media (sdp_message, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + get_client_ports (&client_port); + + /* do SETUP for video only */ + fail_unless (do_setup (conn, video_control, &client_port, &session, + &video_transport) == GST_RTSP_STS_OK); + + fail_unless (gst_rtsp_session_pool_get_n_sessions (pool) == 1); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_simple_request (conn, GST_RTSP_PLAY, + session) == GST_RTSP_STS_OK); + + + /* send TEARDOWN request */ + fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN, + session) == GST_RTSP_STS_OK); + + /* clean up and iterate so the clean-up can finish */ + g_object_unref (pool); + g_free (session); + gst_rtsp_transport_free (video_transport); + gst_sdp_message_free (sdp_message); + gst_rtsp_connection_free (conn); + + stop_server (); + iterate (); +} + +GST_END_TEST; GST_START_TEST (test_play_disconnect) { @@ -1772,6 +1904,22 @@ GST_START_TEST (test_play_smpte_range) GST_END_TEST; +GST_START_TEST (test_play_smpte_range_tcp) +{ + start_tcp_server (); + + do_test_play_tcp_full ("npt=5-"); + do_test_play_tcp_full ("smpte=0:00:00-"); + do_test_play_tcp_full ("smpte=1:00:00-"); + do_test_play_tcp_full ("smpte=1:00:03-"); + do_test_play_tcp_full ("clock=20120321T152256Z-"); + + stop_server (); + iterate (); +} + +GST_END_TEST; + static gpointer thread_func (gpointer data) { @@ -2112,6 +2260,113 @@ GST_START_TEST (test_record_tcp) GST_END_TEST; +static void +do_test_multiple_transports (GstRTSPLowerTrans trans1, GstRTSPLowerTrans trans2) +{ + GstRTSPConnection *conn1; + GstRTSPConnection *conn2; + GstSDPMessage *sdp_message1 = NULL; + GstSDPMessage *sdp_message2 = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + const gchar *audio_control; + GstRTSPRange client_port1, client_port2; + gchar *session1 = NULL; + gchar *session2 = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPTransport *audio_transport = NULL; + GSocket *rtp_socket, *rtcp_socket; + + conn1 = connect_to_server (test_port, TEST_MOUNT_POINT); + conn2 = connect_to_server (test_port, TEST_MOUNT_POINT); + + sdp_message1 = do_describe (conn1, TEST_MOUNT_POINT); + + get_client_ports_full (&client_port1, &rtp_socket, &rtcp_socket); + /* get control strings from DESCRIBE response */ + sdp_media = gst_sdp_message_get_media (sdp_message1, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message1, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn1, video_control, trans1, + &client_port1, NULL, &session1, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn1, audio_control, trans1, + &client_port1, NULL, &session1, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + + sdp_message2 = do_describe (conn2, TEST_MOUNT_POINT); + + /* get control strings from DESCRIBE response */ + sdp_media = gst_sdp_message_get_media (sdp_message2, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message2, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + get_client_ports_full (&client_port2, NULL, NULL); + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn2, video_control, trans2, + &client_port2, NULL, &session2, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn2, audio_control, trans2, + &client_port2, NULL, &session2, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn1, GST_RTSP_PLAY, NULL, session1, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK); + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn2, GST_RTSP_PLAY, NULL, session2, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK); + + + /* receive UDP data */ + receive_rtp (rtp_socket, NULL); + receive_rtcp (rtcp_socket, NULL, 0); + + /* receive TCP data */ + { + GstRTSPMessage *message; + fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK); + fail_unless (gst_rtsp_connection_receive (conn2, message, NULL) == GST_RTSP_OK); + fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA); + gst_rtsp_message_free (message); + } + + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn1, GST_RTSP_TEARDOWN, + session1) == GST_RTSP_STS_OK); + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn2, GST_RTSP_TEARDOWN, + session2) == GST_RTSP_STS_OK); + + /* clean up and iterate so the clean-up can finish */ + g_object_unref (rtp_socket); + g_object_unref (rtcp_socket); + g_free (session1); + g_free (session2); + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + gst_sdp_message_free (sdp_message1); + gst_sdp_message_free (sdp_message2); + gst_rtsp_connection_free (conn1); + gst_rtsp_connection_free (conn2); +} + +GST_START_TEST (test_multiple_transports) +{ + start_server (TRUE); + do_test_multiple_transports (GST_RTSP_LOWER_TRANS_UDP, GST_RTSP_LOWER_TRANS_TCP); + stop_server (); +} + +GST_END_TEST; + static Suite * rtspserver_suite (void) { @@ -2140,12 +2395,16 @@ rtspserver_suite (void) tcase_add_test (tc, test_play_multithreaded_timeout_client); tcase_add_test (tc, test_play_multithreaded_timeout_session); tcase_add_test (tc, test_no_session_timeout); + tcase_add_test (tc, test_play_one_active_stream); tcase_add_test (tc, test_play_disconnect); tcase_add_test (tc, test_play_specific_server_port); tcase_add_test (tc, test_play_smpte_range); + tcase_add_test (tc, test_play_smpte_range_tcp); tcase_add_test (tc, test_shared); tcase_add_test (tc, test_announce_without_sdp); tcase_add_test (tc, test_record_tcp); + tcase_add_test (tc, test_multiple_transports); + return s; } diff --git a/tests/check/gst/stream.c b/tests/check/gst/stream.c index 979c0d6..759b7be 100644 --- a/tests/check/gst/stream.c +++ b/tests/check/gst/stream.c @@ -22,7 +22,8 @@ #include <rtsp-stream.h> #include <rtsp-address-pool.h> -GST_START_TEST (test_get_sockets) +static void +get_sockets (GstRTSPLowerTrans lower_transport, GSocketFamily socket_family) { GstPad *srcpad; GstElement *pay; @@ -33,6 +34,7 @@ GST_START_TEST (test_get_sockets) GSocket *socket; gboolean have_ipv4; gboolean have_ipv6; + GstRTSPTransport *transport; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -57,18 +59,44 @@ GST_START_TEST (test_get_sockets) fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV6, GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000, 60000, 0)); + fail_unless (gst_rtsp_address_pool_add_range (pool, "233.252.0.0", + "233.252.0.0", 50000, 60000, 1)); + fail_unless (gst_rtsp_address_pool_add_range (pool, "FF11:DB8::1", + "FF11:DB8::1", 50000, 60000, 1)); gst_rtsp_stream_set_address_pool (stream, pool); fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); + /* allocate udp ports first */ + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = lower_transport; + + /* no ports allocated, complete stream should fail */ + fail_if (gst_rtsp_stream_complete_stream (stream, transport)); + + /* allocate ports */ + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + socket_family, transport, FALSE)); + + fail_unless (gst_rtsp_stream_complete_stream (stream, transport)); + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); + + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); + else + socket = gst_rtsp_stream_get_rtp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV4); have_ipv4 = (socket != NULL); if (have_ipv4) { fail_unless (g_socket_get_fd (socket) >= 0); g_object_unref (socket); } - socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4); + else + socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV4); if (have_ipv4) { fail_unless (socket != NULL); fail_unless (g_socket_get_fd (socket) >= 0); @@ -77,14 +105,22 @@ GST_START_TEST (test_get_sockets) fail_unless (socket == NULL); } - socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6); + else + socket = gst_rtsp_stream_get_rtp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV6); have_ipv6 = (socket != NULL); if (have_ipv6) { fail_unless (g_socket_get_fd (socket) >= 0); g_object_unref (socket); } - socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6); + else + socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV6); if (have_ipv6) { fail_unless (socket != NULL); fail_unless (g_socket_get_fd (socket) >= 0); @@ -104,8 +140,25 @@ GST_START_TEST (test_get_sockets) gst_object_unref (stream); } +GST_START_TEST (test_get_sockets_udp) +{ + get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV4); + get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV6); +} + GST_END_TEST; +GST_START_TEST (test_get_sockets_mcast) +{ + get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV4); + get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV6); +} + +GST_END_TEST; + +/* The purpose of this test is to make sure that it's not possible to allocate + * multicast UDP ports if the address pool does not contain multicast UDP + * addresses. */ GST_START_TEST (test_allocate_udp_ports_fail) { GstPad *srcpad; @@ -114,6 +167,7 @@ GST_START_TEST (test_allocate_udp_ports_fail) GstBin *bin; GstElement *rtpbin; GstRTSPAddressPool *pool; + GstRTSPTransport *transport; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -135,7 +189,13 @@ GST_START_TEST (test_allocate_udp_ports_fail) "192.168.1.1", 6000, 6001, 0)); gst_rtsp_stream_set_address_pool (stream, pool); - fail_if (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); + fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); + + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; + fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, + transport, FALSE)); + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); @@ -433,7 +493,8 @@ rtspstream_suite (void) TCase *tc = tcase_create ("general"); suite_add_tcase (s, tc); - tcase_add_test (tc, test_get_sockets); + tcase_add_test (tc, test_get_sockets_udp); + tcase_add_test (tc, test_get_sockets_mcast); tcase_add_test (tc, test_allocate_udp_ports_fail); tcase_add_test (tc, test_get_multicast_address); tcase_add_test (tc, test_multicast_address_and_unicast_udp); |