diff options
-rw-r--r-- | gst/rtsp-server/rtsp-client.c | 114 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-media.c | 109 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-media.h | 3 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.c | 1008 | ||||
-rw-r--r-- | gst/rtsp-server/rtsp-stream.h | 9 | ||||
-rw-r--r-- | tests/check/gst/client.c | 3 | ||||
-rw-r--r-- | tests/check/gst/media.c | 103 | ||||
-rw-r--r-- | tests/check/gst/rtspserver.c | 259 | ||||
-rw-r--r-- | tests/check/gst/stream.c | 75 |
9 files changed, 1340 insertions, 343 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 45d178a..83131f3 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -1518,6 +1518,7 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) gint matched; gchar *seek_style = NULL; GstRTSPStatusCode sig_result; + GPtrArray *transports; if (!(session = ctx->session)) goto no_session; @@ -1556,6 +1557,14 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) goto invalid_state; + /* update the pipeline */ + transports = gst_rtsp_session_media_get_transports (sessmedia); + if (!gst_rtsp_media_complete_pipeline (media, transports)) { + g_ptr_array_unref (transports); + goto pipeline_error; + } + g_ptr_array_unref (transports); + /* in play we first unsuspend, media could be suspended from SDP or PAUSED */ if (!gst_rtsp_media_unsuspend (media)) goto unsuspend_failed; @@ -1666,6 +1675,13 @@ invalid_state: ctx); return FALSE; } +pipeline_error: + { + GST_ERROR ("client %p: failed to configure the pipeline", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + ctx); + return FALSE; + } unsuspend_failed: { GST_ERROR ("client %p: unsuspend failed", client); @@ -1784,39 +1800,52 @@ default_configure_client_transport (GstRTSPClient * client, GstRTSPClientPrivate *priv = client->priv; /* we have a valid transport now, set the destination of the client. */ - if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - gboolean use_client_settings; - - use_client_settings = - gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS); - - if (ct->destination && use_client_settings) { - GstRTSPAddress *addr; - - addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination, - ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl); - - if (addr == NULL) - goto no_address; - - gst_rtsp_address_free (addr); + if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST || + ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP) { + + /* allocate UDP ports */ + GSocketFamily family; + gboolean use_client_settings = FALSE; + + family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; + if ((ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) && + gst_rtsp_auth_check (GST_RTSP_AUTH_CHECK_TRANSPORT_CLIENT_SETTINGS) && + (ct->destination != NULL)) + use_client_settings = TRUE; + + if (!gst_rtsp_stream_allocate_udp_sockets (ctx->stream, family, ct, + use_client_settings)) + goto error_allocating_ports; + + if (ct->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { + GstRTSPAddress *addr = NULL; + + if (use_client_settings) { + /* the address has been successfully allocated, let's check if it's + * the one requested by the client */ + addr = gst_rtsp_stream_reserve_address (ctx->stream, ct->destination, + ct->port.min, ct->port.max - ct->port.min + 1, ct->ttl); + + if (addr == NULL) + goto no_address; + } else { + g_free (ct->destination); + addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family); + if (addr == NULL) + goto no_address; + ct->destination = g_strdup (addr->address); + ct->port.min = addr->port; + ct->port.max = addr->port + addr->n_ports - 1; + ct->ttl = addr->ttl; + + gst_rtsp_address_free (addr); + } } else { - GstRTSPAddress *addr; - GSocketFamily family; - - family = priv->is_ipv6 ? G_SOCKET_FAMILY_IPV6 : G_SOCKET_FAMILY_IPV4; - - addr = gst_rtsp_stream_get_multicast_address (ctx->stream, family); - if (addr == NULL) - goto no_address; + GstRTSPUrl *url; + url = gst_rtsp_connection_get_url (priv->connection); g_free (ct->destination); - ct->destination = g_strdup (addr->address); - ct->port.min = addr->port; - ct->port.max = addr->port + addr->n_ports - 1; - ct->ttl = addr->ttl; - - gst_rtsp_address_free (addr); + ct->destination = g_strdup (url->host); } } else { GstRTSPUrl *url; @@ -1863,9 +1892,14 @@ default_configure_client_transport (GstRTSPClient * client, return TRUE; /* ERRORS */ +error_allocating_ports: + { + GST_ERROR_OBJECT (client, "Failed to allocate UDP ports"); + return FALSE; + } no_address: { - GST_ERROR_OBJECT (client, "failed to acquire address for stream"); + GST_ERROR_OBJECT (client, "Failed to acquire address for stream"); return FALSE; } } @@ -3036,6 +3070,7 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx) gchar *path; gint matched; GstRTSPStatusCode sig_result; + GPtrArray *transports; if (!(session = ctx->session)) goto no_session; @@ -3074,7 +3109,15 @@ handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx) if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) goto invalid_state; - /* in play we first unsuspend, media could be suspended from SDP or PAUSED */ + /* update the pipeline */ + transports = gst_rtsp_session_media_get_transports (sessmedia); + if (!gst_rtsp_media_complete_pipeline (media, transports)) { + g_ptr_array_unref (transports); + goto pipeline_error; + } + g_ptr_array_unref (transports); + + /* in record we first unsuspend, media could be suspended from SDP or PAUSED */ if (!gst_rtsp_media_unsuspend (media)) goto unsuspend_failed; @@ -3140,6 +3183,13 @@ invalid_state: ctx); return FALSE; } +pipeline_error: + { + GST_ERROR ("client %p: failed to configure the pipeline", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + ctx); + return FALSE; + } unsuspend_failed: { GST_ERROR ("client %p: unsuspend failed", client); diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index d3a1f5a..159dd3d 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -2115,6 +2115,21 @@ media_streams_set_blocked (GstRTSPMedia * media, gboolean blocked) } static void +stream_unblock (GstRTSPStream * stream, GstRTSPMedia * media) +{ + gst_rtsp_stream_unblock_linked (stream); +} + +static void +media_unblock_linked (GstRTSPMedia * media) +{ + GstRTSPMediaPrivate *priv = media->priv; + + GST_DEBUG ("media %p unblocking linked streams", media); + g_ptr_array_foreach (priv->streams, (GFunc) stream_unblock, media); +} + +static void gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status) { GstRTSPMediaPrivate *priv = media->priv; @@ -2526,8 +2541,6 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message) GST_INFO ("%p: ignoring ASYNC_DONE", media); } else { GST_INFO ("%p: got ASYNC_DONE", media); - collect_media_stats (media); - if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING) gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); } @@ -2642,6 +2655,9 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media) GST_WARNING ("failed to join bin element"); } + if (priv->blocked) + gst_rtsp_stream_set_blocked (stream, TRUE); + priv->adding = FALSE; g_rec_mutex_unlock (&priv->state_lock); @@ -2720,7 +2736,9 @@ start_preroll (GstRTSPMedia * media) GstStateChangeReturn ret; GST_INFO ("setting pipeline to PAUSED for media %p", media); - /* first go to PAUSED */ + + /* start blocked since it is possible that there are no sink elements yet */ + media_streams_set_blocked (media, TRUE); ret = set_target_state (media, GST_STATE_PAUSED, TRUE); switch (ret) { @@ -2737,10 +2755,7 @@ start_preroll (GstRTSPMedia * media) * seeking query in preroll instead */ priv->seekable = -1; priv->is_live = TRUE; - if (!(priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) { - /* start blocked to make sure nothing goes to the sink */ - media_streams_set_blocked (media, TRUE); - } + ret = set_state (media, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) goto state_failed; @@ -3100,6 +3115,8 @@ finish_unprepare (GstRTSPMedia * media) set_state (media, GST_STATE_NULL); g_rec_mutex_lock (&priv->state_lock); + media_streams_set_blocked (media, FALSE); + if (priv->status != GST_RTSP_MEDIA_STATUS_UNPREPARING) return; @@ -3209,8 +3226,6 @@ gst_rtsp_media_unprepare (GstRTSPMedia * media) goto is_busy; GST_INFO ("unprepare media %p", media); - if (priv->blocked) - media_streams_set_blocked (media, FALSE); set_target_state (media, GST_STATE_NULL, FALSE); success = TRUE; @@ -3563,7 +3578,6 @@ default_suspend (GstRTSPMedia * media) { GstRTSPMediaPrivate *priv = media->priv; GstStateChangeReturn ret; - gboolean unblock = FALSE; switch (priv->suspend_mode) { case GST_RTSP_SUSPEND_MODE_NONE: @@ -3574,7 +3588,6 @@ default_suspend (GstRTSPMedia * media) ret = set_target_state (media, GST_STATE_PAUSED, TRUE); if (ret == GST_STATE_CHANGE_FAILURE) goto state_failed; - unblock = TRUE; break; case GST_RTSP_SUSPEND_MODE_RESET: GST_DEBUG ("media %p suspend to NULL", media); @@ -3587,16 +3600,11 @@ default_suspend (GstRTSPMedia * media) * is actually from NULL to PLAY will create a new sequence * number. */ g_ptr_array_foreach (priv->streams, (GFunc) do_set_seqnum, NULL); - unblock = TRUE; break; default: break; } - /* let the streams do the state changes freely, if any */ - if (unblock) - media_streams_set_blocked (media, FALSE); - return TRUE; /* ERRORS */ @@ -3674,7 +3682,19 @@ default_unsuspend (GstRTSPMedia * media) switch (priv->suspend_mode) { case GST_RTSP_SUSPEND_MODE_NONE: - gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); + if ((priv->transport_mode & GST_RTSP_TRANSPORT_MODE_RECORD)) + break; + gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING); + /* at this point the media pipeline has been updated and contain all + * specific transport parts: all active streams contain at least one sink + * element and it's safe to unblock any blocked streams that are active */ + media_unblock_linked (media); + g_rec_mutex_unlock (&priv->state_lock); + if (gst_rtsp_media_get_status (media) == GST_RTSP_MEDIA_STATUS_ERROR) { + g_rec_mutex_lock (&priv->state_lock); + goto preroll_failed; + } + g_rec_mutex_lock (&priv->state_lock); break; case GST_RTSP_SUSPEND_MODE_PAUSE: gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARED); @@ -3682,6 +3702,10 @@ default_unsuspend (GstRTSPMedia * media) case GST_RTSP_SUSPEND_MODE_RESET: { gst_rtsp_media_set_status (media, GST_RTSP_MEDIA_STATUS_PREPARING); + /* at this point the media pipeline has been updated and contain all + * specific transport parts: all active streams contain at least one sink + * element and it's safe to unblock any blocked streams that are active */ + media_unblock_linked (media); if (!start_preroll (media)) goto start_failed; @@ -3771,7 +3795,7 @@ media_set_pipeline_state_locked (GstRTSPMedia * media, GstState state) } else { if (state == GST_STATE_PLAYING) /* make sure pads are not blocking anymore when going to PLAYING */ - media_streams_set_blocked (media, FALSE); + media_unblock_linked (media); set_state (media, state); @@ -4007,3 +4031,52 @@ gst_rtsp_media_seekable (GstRTSPMedia * media) * and no stream is seekable only to the beginning */ return media->priv->seekable; } + +/** + * gst_rtsp_media_complete_pipeline: + * @media: a #GstRTSPMedia + * @transports: a list of #GstRTSPTransport + * + * Add a receiver and sender parts to the pipeline based on the transport from + * SETUP. + * + * Returns: %TRUE if the media pipeline has been sucessfully updated. + */ +gboolean +gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports) +{ + GstRTSPMediaPrivate *priv; + guint i; + + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); + g_return_val_if_fail (transports, FALSE); + + GST_DEBUG_OBJECT (media, "complete pipeline"); + + priv = media->priv; + + g_mutex_lock (&priv->lock); + for (i = 0; i < priv->streams->len; i++) { + GstRTSPStreamTransport *transport; + GstRTSPStream *stream; + const GstRTSPTransport *rtsp_transport; + + transport = g_ptr_array_index (transports, i); + if (!transport) + continue; + + stream = gst_rtsp_stream_transport_get_stream (transport); + if (!stream) + continue; + + rtsp_transport = gst_rtsp_stream_transport_get_transport (transport); + + if (!gst_rtsp_stream_complete_stream (stream, rtsp_transport)) { + g_mutex_unlock (&priv->lock); + return FALSE; + } + } + g_mutex_unlock (&priv->lock); + + return TRUE; +} diff --git a/gst/rtsp-server/rtsp-media.h b/gst/rtsp-server/rtsp-media.h index 9c63d8f..d78265d 100644 --- a/gst/rtsp-server/rtsp-media.h +++ b/gst/rtsp-server/rtsp-media.h @@ -382,6 +382,9 @@ GST_EXPORT void gst_rtsp_media_set_pipeline_state (GstRTSPMedia * media, GstState state); +GST_EXPORT +gboolean gst_rtsp_media_complete_pipeline (GstRTSPMedia * media, GPtrArray * transports); + #ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTSPMedia, gst_object_unref) #endif diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index ea4f034..9a8f1c2 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -99,12 +99,16 @@ struct _GstRTSPStreamPrivate GstElement *udpsrc_v6[2]; GstElement *udpqueue[2]; GstElement *udpsink[2]; + GSocket *socket_v4[2]; + GSocket *socket_v6[2]; /* for UDP multicast */ GstElement *mcast_udpsrc_v4[2]; GstElement *mcast_udpsrc_v6[2]; GstElement *mcast_udpqueue[2]; GstElement *mcast_udpsink[2]; + GSocket *mcast_socket_v4[2]; + GSocket *mcast_socket_v6[2]; /* for TCP transport */ GstElement *appsrc[2]; @@ -291,6 +295,23 @@ gst_rtsp_stream_finalize (GObject * obj) if (priv->rtxsend) g_object_unref (priv->rtxsend); + if (priv->socket_v4[0]) + g_object_unref (priv->socket_v4[0]); + if (priv->socket_v4[1]) + g_object_unref (priv->socket_v4[1]); + if (priv->socket_v6[0]) + g_object_unref (priv->socket_v6[0]); + if (priv->socket_v6[1]) + g_object_unref (priv->socket_v6[1]); + if (priv->mcast_socket_v4[0]) + g_object_unref (priv->mcast_socket_v4[0]); + if (priv->mcast_socket_v4[1]) + g_object_unref (priv->mcast_socket_v4[1]); + if (priv->mcast_socket_v6[0]) + g_object_unref (priv->mcast_socket_v6[0]); + if (priv->mcast_socket_v6[1]) + g_object_unref (priv->mcast_socket_v6[1]); + g_free (priv->multicast_iface); gst_object_unref (priv->payloader); @@ -588,18 +609,14 @@ gst_rtsp_stream_get_mtu (GstRTSPStream * stream) /* Update the dscp qos property on the udp sinks */ static void -update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2]) +update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink) { GstRTSPStreamPrivate *priv; priv = stream->priv; - if (udpsink[0]) { - g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL); - } - - if (udpsink[1]) { - g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL); + if (*udpsink) { + g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL); } } @@ -1087,8 +1104,8 @@ different_address: /* must be called with lock */ static void -set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, - GSocket * rtcp_socket, GSocketFamily family) +set_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family) { const gchar *multisink_socket; @@ -1097,95 +1114,172 @@ set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket, else multisink_socket = "socket"; - g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL); - g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL); + g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL); +} + +/* must be called with lock */ +static void +set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family, const gchar * multicast_iface, + const gchar * addr_str, gint port) +{ + set_socket_for_udpsink (udpsink, socket, family); + + if (multicast_iface) { + g_object_set (G_OBJECT (udpsink), "multicast-iface", + multicast_iface, NULL); + } + + g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL); +} + + +/* must be called with lock */ +static void +set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, + GSocketFamily family) +{ + set_socket_for_udpsink (udpsink, socket, family); +} + +static guint16 +get_port_from_socket (GSocket * socket) +{ + guint16 port; + GSocketAddress *sockaddr; + GError *err; + + GST_DEBUG ("socket: %p", socket); + sockaddr = g_socket_get_local_address (socket, &err); + if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) { + g_clear_object (&sockaddr); + GST_ERROR ("failed to get sockaddr: %s", err->message); + g_error_free (err); + return 0; + } + + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr)); + g_object_unref (sockaddr); + + return port; } + static gboolean -create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2]) +create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink, + GSocket *socket_v4, GSocket *socket_v6, gboolean multicast, gboolean is_rtp) { GstRTSPStreamPrivate *priv = stream->priv; - GstElement *udpsink0, *udpsink1; - udpsink0 = gst_element_factory_make ("multiudpsink", NULL); - udpsink1 = gst_element_factory_make ("multiudpsink", NULL); + *udpsink = gst_element_factory_make ("multiudpsink", NULL); - if (!udpsink0 || !udpsink1) + if (!*udpsink) goto no_udp_protocol; /* configure sinks */ - g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL); + g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL); + g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL); + if (is_rtp) + g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL); + else + g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); /* Needs to be async for RECORD streams, otherwise we will never go to * PLAYING because the sinks will wait for data while the udpsrc can't * provide data with timestamps in PAUSED. */ - if (priv->sinkpad) - g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); - - /* join multicast group when adding clients, so we'll start receiving from it. - * We cannot rely on the udpsrc to join the group since its socket is always a - * local unicast one. */ - g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL); - g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL); + if (!is_rtp || priv->sinkpad) + g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); + if (multicast) { + /* join multicast group when adding clients, so we'll start receiving from it. + * We cannot rely on the udpsrc to join the group since its socket is always a + * local unicast one. */ + g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL); - udpsink[0] = udpsink0; - udpsink[1] = udpsink1; + g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL); + } /* update the dscp qos field in the sinks */ update_dscp_qos (stream, udpsink); + if (priv->server_addr_v4) { + GST_DEBUG_OBJECT (stream, + "udp IPv4, configure udpsinks"); + set_unicast_socket_for_udpsink (*udpsink, socket_v4, + G_SOCKET_FAMILY_IPV4); + } + + if (priv->server_addr_v6) { + GST_DEBUG_OBJECT (stream, + "udp IPv6, configure udpsinks"); + set_unicast_socket_for_udpsink (*udpsink, socket_v6, + G_SOCKET_FAMILY_IPV6); + } + + if (multicast) { + gint port; + if (priv->mcast_addr_v4) { + GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks"); + port = get_port_from_socket (socket_v4); + if (!port) + goto get_port_failed; + set_multicast_socket_for_udpsink (*udpsink, socket_v4, + G_SOCKET_FAMILY_IPV4, priv->multicast_iface, priv->mcast_addr_v4->address, port); + } + + if (priv->mcast_addr_v6) { + GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks"); + port = get_port_from_socket (socket_v6); + if (!port) + goto get_port_failed; + set_multicast_socket_for_udpsink (*udpsink, socket_v6, + G_SOCKET_FAMILY_IPV6, priv->multicast_iface, priv->mcast_addr_v6->address, port); + } + + } + return TRUE; /* ERRORS */ no_udp_protocol: { + GST_ERROR_OBJECT (stream, "failed to create udpsink element"); + return FALSE; + } +get_port_failed: + { + GST_ERROR_OBJECT (stream, "failed to get udp port"); return FALSE; } } /* must be called with lock */ static gboolean -create_and_configure_udpsources (GstElement * udpsrc_out[2], - GSocket * rtp_socket, GSocket * rtcp_socket) +create_and_configure_udpsource (GstElement ** udpsrc, + GSocket * socket) { GstStateChangeReturn ret; - udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL); - udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL); + g_assert (socket != NULL); - if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL) + *udpsrc = gst_element_factory_make ("udpsrc", NULL); + if (*udpsrc == NULL) goto error; - g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL); + g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL); /* The udpsrc cannot do the join because its socket is always a local unicast * one. The udpsink sharing the same socket will do it for us. */ - g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[0]), "close-socket", FALSE, NULL); - g_object_set (G_OBJECT (udpsrc_out[1]), "close-socket", FALSE, NULL); + g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL); - ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY); - if (ret == GST_STATE_CHANGE_FAILURE) - goto error; - ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY); + ret = gst_element_set_state (*udpsrc, GST_STATE_READY); if (ret == GST_STATE_CHANGE_FAILURE) goto error; @@ -1194,13 +1288,9 @@ create_and_configure_udpsources (GstElement * udpsrc_out[2], /* ERRORS */ error: { - if (udpsrc_out[0]) { - gst_element_set_state (udpsrc_out[0], GST_STATE_NULL); - g_clear_object (&udpsrc_out[0]); - } - if (udpsrc_out[1]) { - gst_element_set_state (udpsrc_out[1], GST_STATE_NULL); - g_clear_object (&udpsrc_out[1]); + if (*udpsrc) { + gst_element_set_state (*udpsrc, GST_STATE_NULL); + g_clear_object (udpsrc); } return FALSE; } @@ -1208,29 +1298,21 @@ error: static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, - GstElement * udpsrc_out[2], GstElement * udpsink_out[2], - GstRTSPAddress ** server_addr_out, gboolean multicast) + GSocket *socket_out[2], GstRTSPAddress ** server_addr_out, + gboolean multicast, GstRTSPTransport * ct) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; GSocket *rtcp_socket; gint tmp_rtp, tmp_rtcp; guint count; - gint rtpport, rtcpport; GList *rejected_addresses = NULL; GstRTSPAddress *addr = NULL; GInetAddress *inetaddr = NULL; - gchar *addr_str; GSocketAddress *rtp_sockaddr = NULL; GSocketAddress *rtcp_sockaddr = NULL; GstRTSPAddressPool *pool; - g_assert (!udpsrc_out[0]); - g_assert (!udpsrc_out[1]); - g_assert ((!udpsink_out[0] && !udpsink_out[1]) || - (udpsink_out[0] && udpsink_out[1])); - g_assert (*server_addr_out == NULL); - pool = priv->pool; count = 0; @@ -1262,7 +1344,7 @@ again: rejected_addresses = g_list_prepend (rejected_addresses, addr); if (!pool) - goto no_ports; + goto no_pool; flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; if (multicast) @@ -1278,11 +1360,13 @@ again: addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2); if (addr == NULL) - goto no_ports; + goto no_address; tmp_rtp = addr->port; g_clear_object (&inetaddr); + /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and + * socket control message set in udpsrc? */ if (multicast) inetaddr = g_inet_address_new_any (family); else @@ -1300,6 +1384,7 @@ again: rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp); if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) { + GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again"); g_object_unref (rtp_sockaddr); goto again; } @@ -1328,6 +1413,7 @@ again: rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp); if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) { + GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again"); g_object_unref (rtcp_sockaddr); g_clear_object (&rtp_socket); goto again; @@ -1341,62 +1427,42 @@ again: addr->n_ports = 2; } - addr_str = addr->address; g_clear_object (&inetaddr); - if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) { - goto no_udp_protocol; - } - - g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL); - g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL); - - /* this should not happen... */ - if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) - goto port_error; - - /* This function is called twice (for v4 and v6) but we create only one pair - * of udpsinks. */ - if (!udpsink_out[0] - && !create_and_configure_udpsinks (stream, udpsink_out)) - goto no_udp_protocol; - - if (multicast) { - g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface", - priv->multicast_iface, NULL); - g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface", - priv->multicast_iface, NULL); - - g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL); - g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL); - } - - set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family); - + socket_out[0] = rtp_socket; + socket_out[1] = rtcp_socket; *server_addr_out = addr; - g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); + GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d", addr->address, tmp_rtp, tmp_rtcp); - g_object_unref (rtp_socket); - g_object_unref (rtcp_socket); + g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); return TRUE; /* ERRORS */ no_udp_protocol: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error"); goto cleanup; } -no_ports: +no_pool: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no address pool specified"); goto cleanup; } -port_error: +no_address: { + GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); + goto cleanup; + } +no_ports: + { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports"); goto cleanup; } socket_error: { + GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error"); goto cleanup; } cleanup: @@ -1425,14 +1491,73 @@ cleanup: * Allocates RTP and RTCP ports. * * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated. - * Deprecated: This function shouldn't have been made public */ gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, - GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings) + GSocketFamily family, GstRTSPTransport * ct, + gboolean use_transport_settings) { - g_warn_if_reached (); - return FALSE; + GstRTSPStreamPrivate *priv; + gboolean ret = FALSE; + GstRTSPLowerTrans transport; + gboolean allocated = FALSE; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + g_return_val_if_fail (ct != NULL, FALSE); + priv = stream->priv; + + transport = ct->lower_transport; + + g_mutex_lock (&priv->lock); + + if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { + if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_addr_v4) + allocated = TRUE; + else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_addr_v6) + allocated = TRUE; + } else if (transport == GST_RTSP_LOWER_TRANS_UDP) { + if (family == G_SOCKET_FAMILY_IPV4 && priv->server_addr_v4) + allocated = TRUE; + else if (family == G_SOCKET_FAMILY_IPV6 && priv->server_addr_v6) + allocated = TRUE; + } + + if (allocated) { + g_mutex_unlock (&priv->lock); + return TRUE; + } + + if (family == G_SOCKET_FAMILY_IPV4) { + /* IPv4 */ + if (transport == GST_RTSP_LOWER_TRANS_UDP) { + /* UDP unicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->socket_v4, &priv->server_addr_v4, FALSE, ct); + } else { + /* multicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, + priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct); + } + } else { + /* IPv6 */ + if (transport == GST_RTSP_LOWER_TRANS_UDP) { + /* unicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->socket_v6, &priv->server_addr_v6, FALSE, ct); + + } else { + /* multicast */ + GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6"); + ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, + priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct); + } + } + g_mutex_unlock (&priv->lock); + + return ret; } /** @@ -1483,33 +1608,6 @@ gst_rtsp_stream_is_client_side (GstRTSPStream * stream) return ret; } -/* must be called with lock */ -static gboolean -alloc_ports (GstRTSPStream * stream) -{ - GstRTSPStreamPrivate *priv = stream->priv; - gboolean ret = TRUE; - - if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) { - ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE); - - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE); - } - - /* FIXME: Maybe actually consider the return values? */ - if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) { - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, - priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE); - - ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, - priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE); - } - - return ret; -} - /** * gst_rtsp_stream_get_server_port: * @stream: a #GstRTSPStream @@ -2365,71 +2463,357 @@ on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ()); } +typedef struct _ProbeData ProbeData; + +struct _ProbeData +{ + GstRTSPStream *stream; + /* existing sink, already linked to tee */ + GstElement *sink1; + /* new sink, about to be linked */ + GstElement *sink2; + /* new queue element, that will be linked to tee and sink1 */ + GstElement **queue1; + /* new queue element, that will be linked to tee and sink2 */ + GstElement **queue2; + GstPad *sink_pad; + GstPad *tee_pad; + guint index; +}; + static void -plug_sink (GstBin * bin, GstElement * tee, GstElement * sink, - GstElement ** queue_out) +free_cb_data (gpointer user_data) { - GstPad *pad; - GstPad *teepad; - GstPad *queuepad; + ProbeData *data = user_data; + + gst_object_unref (data->stream); + gst_object_unref (data->sink1); + gst_object_unref (data->sink2); + gst_object_unref (data->sink_pad); + gst_object_unref (data->tee_pad); + g_free (data); +} + - gst_bin_add (bin, sink); +static void +create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream, GstElement *tee, + GstElement *sink, GstElement ** queue) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstPad *tee_pad; + GstPad *queue_pad; + GstPad *sink_pad; - *queue_out = gst_element_factory_make ("queue", NULL); - g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0, + /* create queue for the new stream */ + *queue = gst_element_factory_make ("queue", NULL); + g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); - gst_bin_add (bin, *queue_out); + gst_bin_add (priv->joined_bin, *queue); /* link tee to queue */ - teepad = gst_element_get_request_pad (tee, "src_%u"); - pad = gst_element_get_static_pad (*queue_out, "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); + tee_pad = gst_element_get_request_pad (tee, "src_%u"); + queue_pad = gst_element_get_static_pad (*queue, "sink"); + gst_pad_link (tee_pad, queue_pad); + gst_object_unref (queue_pad); + gst_object_unref (tee_pad); /* link queue to sink */ - queuepad = gst_element_get_static_pad (*queue_out, "src"); - pad = gst_element_get_static_pad (sink, "sink"); - gst_pad_link (queuepad, pad); - gst_object_unref (queuepad); - gst_object_unref (pad); + queue_pad = gst_element_get_static_pad (*queue, "src"); + sink_pad = gst_element_get_static_pad (sink, "sink"); + gst_pad_link (queue_pad, sink_pad); + gst_object_unref (queue_pad); + gst_object_unref (sink_pad); + + gst_element_sync_state_with_parent (sink); + gst_element_sync_state_with_parent (*queue); +} + +static GstPadProbeReturn +create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad, + GstPadProbeInfo * info, gpointer user_data) +{ + GstRTSPStreamPrivate *priv; + ProbeData *data = user_data; + GstRTSPStream *stream; + GstElement **queue1; + GstElement **queue2; + GstPad *sink_pad; + GstPad *tee_pad; + GstPad *queue_pad; + guint index; + + stream = data->stream; + priv = stream->priv; + queue1 = data->queue1; + queue2 = data->queue2; + sink_pad = data->sink_pad; + tee_pad = data->tee_pad; + index = data->index; + + /* unlink tee and the existing sink: + * .-----. .---------. + * | tee | | sink1 | + * sink src->sink | + * '-----' '---------' + */ + g_assert (gst_pad_unlink (tee_pad, sink_pad)); + + /* add queue to the already existing stream */ + *queue1 = gst_element_factory_make ("queue", NULL); + g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0, + "max-size-time", G_GINT64_CONSTANT (0), NULL); + gst_bin_add (priv->joined_bin, *queue1); + + /* link tee, queue and sink: + * .-----. .---------. .---------. + * | tee | | queue1 | | sink1 | + * sink src->sink src->sink | + * '-----' '---------' '---------' + */ + queue_pad = gst_element_get_static_pad (*queue1, "sink"); + gst_pad_link (tee_pad, queue_pad); + gst_object_unref (queue_pad); + queue_pad = gst_element_get_static_pad (*queue1, "src"); + gst_pad_link (queue_pad, sink_pad); + gst_object_unref (queue_pad); + + gst_element_sync_state_with_parent (*queue1); + + /* create queue and link it to tee and the new sink */ + create_and_plug_queue_to_unlinked_stream (stream, + priv->tee[index], data->sink2, queue2); + + /* the final stream: + * + * .-----. .---------. .---------. + * | tee | | queue1 | | sink1 | + * sink src->sink src->sink | + * | | '---------' '---------' + * | | .---------. .---------. + * | | | queue2 | | sink2 | + * | src->sink src->sink | + * '-----' '---------' '---------' + */ + + return GST_PAD_PROBE_REMOVE; +} + +static void +create_and_plug_queue_to_linked_stream (GstRTSPStream * stream, GstElement * sink1, + GstElement * sink2, guint index, GstElement ** queue1, + GstElement ** queue2) +{ + ProbeData *data; + + data = g_new0 (ProbeData, 1); + data->stream = gst_object_ref (stream); + data->sink1 = gst_object_ref (sink1); + data->sink2 = gst_object_ref (sink2); + data->queue1 = queue1; + data->queue2 = queue2; + data->index = index; + + data->sink_pad = gst_element_get_static_pad (sink1, "sink"); + g_assert (data->sink_pad); + data->tee_pad = gst_pad_get_peer (data->sink_pad); + g_assert (data->tee_pad); + + gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE, + create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data); +} + +static void +plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug, + GstElement ** queue_to_plug, guint index, gboolean is_mcast) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GstElement *existing_sink; + + if (is_mcast) + existing_sink = priv->udpsink[index]; + else + existing_sink = priv->mcast_udpsink[index]; + + GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp"); + + /* add sink to the bin */ + gst_bin_add (priv->joined_bin, sink_to_plug); + + if (priv->appsink[index] && existing_sink) { + + /* queues are already added for the existing stream, add one for + the newly added udp stream */ + create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index], + sink_to_plug, queue_to_plug); + + } else if (priv->appsink[index] || existing_sink) { + GstElement **queue; + GstElement *element; + + /* add queue to the already existing stream plus the newly created udp + stream */ + if (priv->appsink[index]) { + element = priv->appsink[index]; + queue = &priv->appqueue[index]; + } else { + element = existing_sink; + if (is_mcast) + queue = &priv->udpqueue[index]; + else + queue = &priv->mcast_udpqueue[index]; + } + + create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug, index, + queue, queue_to_plug); + + } else { + GstPad *tee_pad; + GstPad *sink_pad; + + GST_DEBUG_OBJECT (stream, "creating first stream"); + + /* no need to add queues */ + tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); + sink_pad = gst_element_get_static_pad (sink_to_plug, "sink"); + gst_pad_link (tee_pad, sink_pad); + gst_object_unref (tee_pad); + gst_object_unref (sink_pad); + } + + gst_element_sync_state_with_parent (sink_to_plug); } -/* must be called with lock */ static void -create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) +plug_tcp_sink (GstRTSPStream * stream, guint index) +{ + GstRTSPStreamPrivate *priv = stream->priv; + + GST_DEBUG_OBJECT (stream, "plug tcp sink"); + + /* add sink to the bin */ + gst_bin_add (priv->joined_bin, priv->appsink[index]); + + if (priv->mcast_udpsink[index] && priv->udpsink[index]) { + + /* queues are already added for the existing stream, add one for + the newly added tcp stream */ + create_and_plug_queue_to_unlinked_stream (stream, + priv->tee[index], priv->appsink[index], &priv->appqueue[index]); + + } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) { + GstElement **queue; + GstElement *element; + + /* add queue to the already existing stream plus the newly created tcp + stream */ + if (priv->mcast_udpsink[index]) { + element = priv->mcast_udpsink[index]; + queue = &priv->mcast_udpqueue[index]; + } else { + element = priv->udpsink[index]; + queue = &priv->udpqueue[index]; + } + + create_and_plug_queue_to_linked_stream (stream, element, priv->appsink[index], index, + queue, &priv->appqueue[index]); + + } else { + GstPad *tee_pad; + GstPad *sink_pad; + + /* no need to add queues */ + tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); + sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink"); + gst_pad_link (tee_pad, sink_pad); + gst_object_unref (tee_pad); + gst_object_unref (sink_pad); + } + + gst_element_sync_state_with_parent (priv->appsink[index]); +} + +static void +plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport, + guint index) +{ + GstRTSPStreamPrivate *priv; + gboolean is_tcp, is_udp, is_mcast; + priv = stream->priv; + + is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; + + if (is_udp) + plug_udp_sink (stream, priv->udpsink[index], + &priv->udpqueue[index], index, FALSE); + + else if (is_mcast) + plug_udp_sink (stream, priv->mcast_udpsink[index], + &priv->mcast_udpqueue[index], index, TRUE); + + else if (is_tcp) + plug_tcp_sink (stream, index); +} + +/* must be called with lock */ +static gboolean +create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp, is_udp; + GstBin *bin; + gboolean is_tcp, is_udp, is_mcast; gint i; + GST_DEBUG_OBJECT (stream, "create sender part"); priv = stream->priv; + bin = priv->joined_bin; + + is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; - is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; - is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) || - (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)); + GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d", is_tcp, is_udp, + is_mcast); + + if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) { + GST_WARNING_OBJECT (stream, "no sockets assigned for UDP"); + return FALSE; + } + + if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) { + GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast"); + return FALSE; + } for (i = 0; i < 2; i++) { + gboolean link_tee = FALSE; /* For the sender we create this bit of pipeline for both - * RTP and RTCP. Sync and preroll are enabled on udpsink so - * we need to add a queue before appsink and udpsink to make - * the pipeline not block. For the TCP case, we want to pump - * client as fast as possible anyway. This pipeline is used - * when both TCP and UDP are present. + * RTP and RTCP. + * Initially there will be only one active transport for + * the stream, so the pipeline will look like this: + * + * .--------. .-----. .---------. + * | rtpbin | | tee | | sink | + * | send->sink src->sink | + * '--------' '-----' '---------' + * + * For each new transport, the already existing branch will + * be reconfigured by adding a queue element: * * .--------. .-----. .---------. .---------. * | rtpbin | | tee | | queue | | udpsink | * | send->sink src->sink src->sink | * '--------' | | '---------' '---------' * | | .---------. .---------. + * | | | queue | | udpsink | + * | src->sink src->sink | + * | | '---------' '---------' + * | | .---------. .---------. * | | | queue | | appsink | * | src->sink src->sink | * '-----' '---------' '---------' - * - * When only UDP or only TCP is allowed, we skip the tee and queue - * and link the udpsink (for UDP) or appsink (for TCP) directly to - * the session. */ /* Only link the RTP send src if we're going to send RTP, link @@ -2437,71 +2821,49 @@ create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state) if (!priv->srcpad && i == 0) continue; - if (is_tcp) { + if (!priv->tee[i]) { + /* make tee for RTP/RTCP */ + priv->tee[i] = gst_element_factory_make ("tee", NULL); + gst_bin_add (bin, priv->tee[i]); + link_tee = TRUE; + } + + if (is_udp && !priv->udpsink[i]) { + /* we create only one pair of udpsinks for IPv4 and IPv6 */ + create_and_configure_udpsink (stream, &priv->udpsink[i], priv->socket_v4[i], + priv->socket_v6[i], FALSE, (i == 0)); + plug_sink (stream, transport, i); + } else if (is_mcast && !priv->mcast_udpsink[i]) { + /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */ + create_and_configure_udpsink (stream, &priv->mcast_udpsink[i], + priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0)); + plug_sink (stream, transport, i); + } else if (is_tcp && !priv->appsink[i]) { /* make appsink */ priv->appsink[i] = gst_element_factory_make ("appsink", NULL); g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); + + /* we need to set sync and preroll to FALSE for the sink to avoid + * deadlock. This is only needed for sink sending RTCP data. */ + if (i == 1) + g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, + NULL); + gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), &sink_cb, stream, NULL); + plug_sink (stream, transport, i); } - /* If we have udp always use a tee because we could have mcast clients - * requesting different ports, in which case we'll have to plug more - * udpsinks. */ - if (is_udp) { - /* make tee for RTP/RTCP */ - priv->tee[i] = gst_element_factory_make ("tee", NULL); - gst_bin_add (bin, priv->tee[i]); - + if (link_tee) { /* and link to rtpbin send pad */ + gst_element_sync_state_with_parent (priv->tee[i]); pad = gst_element_get_static_pad (priv->tee[i], "sink"); gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); - - if (priv->udpsink[i]) - plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]); - - if (priv->mcast_udpsink[i]) - plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i], - &priv->mcast_udpqueue[i]); - - if (is_tcp) { - if (i == 1) - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]); - } - } else if (is_tcp) { - /* only appsink needed, link it to the session */ - gst_bin_add (bin, priv->appsink[i]); - pad = gst_element_get_static_pad (priv->appsink[i], "sink"); - gst_pad_link (priv->send_src[i], pad); - gst_object_unref (pad); - - /* when its only TCP, we need to set sync and preroll to FALSE - * for the sink to avoid deadlock. And this is only needed for - * sink used for RTCP data, not the RTP data. */ - if (i == 1) - g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); - } - - /* check if we need to set to a special state */ - if (state != GST_STATE_NULL) { - if (priv->udpsink[i]) - gst_element_set_state (priv->udpsink[i], state); - if (priv->mcast_udpsink[i]) - gst_element_set_state (priv->mcast_udpsink[i], state); - if (priv->appsink[i]) - gst_element_set_state (priv->appsink[i], state); - if (priv->appqueue[i]) - gst_element_set_state (priv->appqueue[i], state); - if (priv->udpqueue[i]) - gst_element_set_state (priv->udpqueue[i], state); - if (priv->mcast_udpqueue[i]) - gst_element_set_state (priv->mcast_udpqueue[i], state); - if (priv->tee[i]) - gst_element_set_state (priv->tee[i], state); } } + + return TRUE; } /* must be called with lock */ @@ -2533,30 +2895,48 @@ plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src, } /* must be called with lock */ -static void -create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) +static gboolean +create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport * + transport) { GstRTSPStreamPrivate *priv; GstPad *pad; - gboolean is_tcp; + GstBin *bin; + gboolean tcp; + gboolean udp; + gboolean mcast; gint i; + GST_DEBUG_OBJECT (stream, "create receiver part"); priv = stream->priv; + bin = priv->joined_bin; - is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP; + tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; + udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; + mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; for (i = 0; i < 2; i++) { /* For the receiver we create this bit of pipeline for both * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc * and it is all funneled into the rtpbin receive pad. * + * * .--------. .--------. .--------. * | udpsrc | | funnel | | rtpbin | - * | src->sink src->sink | + * | RTP src->sink src->sink | + * '--------' | | | | + * .--------. | | | | + * | appsrc | | | | | + * | RTP src->sink | | | + * '--------' '--------' | | + * | | + * .--------. .--------. | | + * | udpsrc | | funnel | | | + * | RTCP src->sink src->sink | * '--------' | | '--------' * .--------. | | * | appsrc | | | - * | src->sink | + * | RTCP src->sink | * '--------' '--------' */ @@ -2574,19 +2954,41 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); - if (priv->udpsrc_v4[i]) + if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) { + GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->udpsrc_v4[i], + priv->socket_v4[i])) + goto udpsrc_error; + plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]); + } + + if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) { + GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->udpsrc_v6[i], + priv->socket_v6[i])) + goto udpsrc_error; - if (priv->udpsrc_v6[i]) plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); + } - if (priv->mcast_udpsrc_v4[i]) + if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) { + GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i], + priv->mcast_socket_v4[i])) + goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); + } - if (priv->mcast_udpsrc_v6[i]) + if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) { + GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources"); + if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i], + priv->mcast_socket_v6[i])) + goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); + } - if (is_tcp) { + if (tcp && !priv->appsrc[i]) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); priv->appsrc_base_time[i] = -1; @@ -2595,11 +2997,14 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state) plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]); } - /* check if we need to set to a special state */ - if (state != GST_STATE_NULL) { - gst_element_set_state (priv->funnel[i], state); - } + gst_element_sync_state_with_parent (priv->funnel[i]); } + + return TRUE; + +mcast_udpsrc_error: +udpsrc_error: + return FALSE; } static gboolean @@ -2688,9 +3093,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, GST_INFO ("stream %p joining bin as session %u", stream, idx); - if (!alloc_ports (stream)) - goto no_ports; - if (priv->profiles & GST_RTSP_PROFILE_SAVP || priv->profiles & GST_RTSP_PROFILE_SAVPF) { /* For SRTP */ @@ -2727,7 +3129,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); g_free (name); } else { - /* Need to connect our sinkpad from here */ + /* RECORD case: need to connect our sinkpad from here */ g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); /* EOS */ g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); @@ -2766,9 +3168,6 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, g_signal_connect (priv->session, "on-sender-ssrc-active", (GCallback) on_sender_ssrc_active, stream); - create_sender_part (stream, bin, state); - create_receiver_part (stream, bin, state); - if (priv->srcpad) { /* be notified of caps changes */ priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", @@ -2777,6 +3176,7 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, } priv->joined_bin = bin; + GST_DEBUG_OBJECT (stream, "successfully joined bin"); g_mutex_unlock (&priv->lock); return TRUE; @@ -2787,12 +3187,6 @@ was_joined: g_mutex_unlock (&priv->lock); return TRUE; } -no_ports: - { - g_mutex_unlock (&priv->lock); - GST_WARNING ("failed to allocate ports %u", idx); - return FALSE; - } link_failed: { GST_WARNING ("failed to link stream %u", idx); @@ -3781,30 +4175,22 @@ pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) return GST_PAD_PROBE_OK; } -/** - * gst_rtsp_stream_set_blocked: - * @stream: a #GstRTSPStream - * @blocked: boolean indicating we should block or unblock - * - * Blocks or unblocks the dataflow on @stream. - * - * Returns: %TRUE on success - */ -gboolean -gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) +static void +set_blocked (GstRTSPStream * stream, gboolean blocked) { GstRTSPStreamPrivate *priv; int i; - g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + GST_DEBUG_OBJECT (stream, "blocked: %d", blocked); priv = stream->priv; - g_mutex_lock (&priv->lock); if (blocked) { priv->blocking = FALSE; for (i = 0; i < 2; i++) { - if (priv->blocked_id[i] == 0) { + if (priv->blocked_id[i] != 0) + continue; + if (priv->send_src[i]) { priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i], GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking, @@ -3820,6 +4206,51 @@ gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) } priv->blocking = FALSE; } +} + +/** + * gst_rtsp_stream_set_blocked: + * @stream: a #GstRTSPStream + * @blocked: boolean indicating we should block or unblock + * + * Blocks or unblocks the dataflow on @stream. + * + * Returns: %TRUE on success + */ +gboolean +gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + g_mutex_lock (&priv->lock); + set_blocked (stream, blocked); + g_mutex_unlock (&priv->lock); + + return TRUE; +} + +/** + * gst_rtsp_stream_ublock_linked: + * @stream: a #GstRTSPStream + * + * Unblocks the dataflow on @stream if it is linked. + * + * Returns: %TRUE on success + */ +gboolean +gst_rtsp_stream_unblock_linked (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + g_mutex_lock (&priv->lock); + if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0])) + set_blocked (stream, FALSE); g_mutex_unlock (&priv->lock); return TRUE; @@ -4015,3 +4446,50 @@ gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop) return TRUE; } + +/** + * gst_rtsp_stream_complete_stream: + * @stream: a #GstRTSPStream + * @transport: a #GstRTSPTransport + * + * Add a receiver and sender part to the pipeline based on the transport from + * SETUP. + * + * Returns: %TRUE if the pipeline has been sucessfully updated. + */ +gboolean +gst_rtsp_stream_complete_stream (GstRTSPStream * stream, + const GstRTSPTransport * transport) +{ + GstRTSPStreamPrivate *priv; + + g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); + + priv = stream->priv; + GST_DEBUG_OBJECT (stream, "complete stream"); + + g_mutex_lock (&priv->lock); + + if (!(priv->protocols & transport->lower_transport)) + goto unallowed_transport; + + if (!create_receiver_part (stream, transport)) + goto create_receiver_error; + + /* in the RECORD case, we only add RTCP sender part */ + if (!create_sender_part (stream, transport)) + goto create_sender_error; + + g_mutex_unlock (&priv->lock); + + GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated"); + return TRUE; + +create_receiver_error: +create_sender_error: +unallowed_transport: + { + g_mutex_unlock (&priv->lock); + return FALSE; + } +} diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h index 7aa1aa2..add1a88 100644 --- a/gst/rtsp-server/rtsp-stream.h +++ b/gst/rtsp-server/rtsp-stream.h @@ -160,6 +160,10 @@ gboolean gst_rtsp_stream_set_blocked (GstRTSPStream * stream, GST_EXPORT gboolean gst_rtsp_stream_is_blocking (GstRTSPStream * stream); + +GST_EXPORT +gboolean gst_rtsp_stream_unblock_linked (GstRTSPStream * stream); + GST_EXPORT void gst_rtsp_stream_set_client_side (GstRTSPStream *stream, gboolean client_side); @@ -272,7 +276,7 @@ GstElement * gst_rtsp_stream_request_aux_sender (GstRTSPStream * st GST_EXPORT gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family, - GstRTSPTransport *transport, gboolean use_client_setttings); + GstRTSPTransport *transport, gboolean use_client_settings); GST_EXPORT void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream, GstRTSPPublishClockMode mode); @@ -280,6 +284,9 @@ void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * GST_EXPORT GstRTSPPublishClockMode gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream); +GST_EXPORT +gboolean gst_rtsp_stream_complete_stream (GstRTSPStream * stream, const GstRTSPTransport * transport); + /** * GstRTSPStreamTransportFilterFunc: * @stream: a #GstRTSPStream object diff --git a/tests/check/gst/client.c b/tests/check/gst/client.c index 7779664..4adf3d1 100644 --- a/tests/check/gst/client.c +++ b/tests/check/gst/client.c @@ -793,7 +793,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) g_object_unref (session_pool); - /* simple SETUP with a valid URI and multicast, but an invalid prt */ fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP, "rtsp://localhost/test/stream=0") == GST_RTSP_OK); @@ -813,7 +812,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) g_object_unref (session_pool); - /* simple SETUP with a valid URI and multicast, but an invalid ttl */ fail_unless (gst_rtsp_message_init_request (&request, GST_RTSP_SETUP, "rtsp://localhost/test/stream=0") == GST_RTSP_OK); @@ -832,7 +830,6 @@ GST_START_TEST (test_client_multicast_invalid_transport_specific) fail_unless (gst_rtsp_session_pool_get_n_sessions (session_pool) == 0); g_object_unref (session_pool); - teardown_client (client); g_object_unref (ctx.auth); gst_rtsp_token_unref (ctx.token); diff --git a/tests/check/gst/media.c b/tests/check/gst/media.c index b0ddb19..cf09814 100644 --- a/tests/check/gst/media.c +++ b/tests/check/gst/media.c @@ -21,7 +21,73 @@ #include <rtsp-media-factory.h> -GST_START_TEST (test_launch) +GST_START_TEST (test_media_seek) +{ + GstRTSPMediaFactory *factory; + GstRTSPMedia *media; + GstRTSPUrl *url; + GstRTSPStream *stream; + GstRTSPTimeRange *range; + gchar *str; + GstRTSPThreadPool *pool; + GstRTSPThread *thread; + GstRTSPTransport *transport; + + factory = gst_rtsp_media_factory_new (); + fail_if (gst_rtsp_media_factory_is_shared (factory)); + fail_unless (gst_rtsp_url_parse ("rtsp://localhost:8554/test", + &url) == GST_RTSP_OK); + + gst_rtsp_media_factory_set_launch (factory, + "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); + + media = gst_rtsp_media_factory_construct (factory, url); + fail_unless (GST_IS_RTSP_MEDIA (media)); + + fail_unless (gst_rtsp_media_n_streams (media) == 1); + + stream = gst_rtsp_media_get_stream (media, 0); + fail_unless (stream != NULL); + + pool = gst_rtsp_thread_pool_new (); + thread = gst_rtsp_thread_pool_get_thread (pool, + GST_RTSP_THREAD_TYPE_MEDIA, NULL); + + fail_unless (gst_rtsp_media_prepare (media, thread)); + + /* define transport */ + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = GST_RTSP_LOWER_TRANS_TCP; + + fail_unless (gst_rtsp_stream_complete_stream (stream, transport)); + + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); + fail_unless (gst_rtsp_range_parse ("npt=5.0-", &range) == GST_RTSP_OK); + + /* the media is seekable now */ + fail_unless (gst_rtsp_media_seek (media, range)); + + str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT); + fail_unless (g_str_equal (str, "npt=5-")); + + gst_rtsp_range_free (range); + g_free (str); + + fail_unless (gst_rtsp_media_unprepare (media)); + g_object_unref (media); + + gst_rtsp_url_free (url); + g_object_unref (factory); + + g_object_unref (pool); + + gst_rtsp_thread_pool_cleanup (); +} + +GST_END_TEST; + + +GST_START_TEST (test_media_seek_no_sinks) { GstRTSPMediaFactory *factory; GstRTSPMedia *media; @@ -70,15 +136,8 @@ GST_START_TEST (test_launch) fail_unless (g_str_equal (str, "npt=0-")); g_free (str); - fail_unless (gst_rtsp_media_seek (media, range)); - - str = gst_rtsp_media_get_range_string (media, FALSE, GST_RTSP_RANGE_NPT); - fail_unless (g_str_equal (str, "npt=5-")); - g_free (str); - - str = gst_rtsp_media_get_range_string (media, TRUE, GST_RTSP_RANGE_NPT); - fail_unless (g_str_equal (str, "npt=5-")); - g_free (str); + /* fails, need to be prepared and contain sink elements */ + fail_if (gst_rtsp_media_seek (media, range)); fail_unless (gst_rtsp_media_unprepare (media)); @@ -126,12 +185,13 @@ GST_START_TEST (test_media) GST_END_TEST; static void -test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) +test_prepare_reusable (const gchar * launch_line) { GstRTSPMediaFactory *factory; GstRTSPMedia *media; GstRTSPUrl *url; GstRTSPThread *thread; + GstRTSPThreadPool *pool; factory = gst_rtsp_media_factory_new (); fail_if (gst_rtsp_media_factory_is_shared (factory)); @@ -146,6 +206,7 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) g_object_set (G_OBJECT (media), "reusable", TRUE, NULL); + pool = gst_rtsp_thread_pool_new (); thread = gst_rtsp_thread_pool_get_thread (pool, GST_RTSP_THREAD_TYPE_MEDIA, NULL); fail_unless (gst_rtsp_media_prepare (media, thread)); @@ -162,6 +223,17 @@ test_prepare_reusable (GstRTSPThreadPool * pool, const gchar * launch_line) g_object_unref (factory); } +GST_START_TEST (test_media_reusable) +{ + + /* test reusable media */ + test_prepare_reusable ("( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); + test_prepare_reusable ( + "( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )"); +} + +GST_END_TEST; + GST_START_TEST (test_media_prepare) { GstRTSPMediaFactory *factory; @@ -199,11 +271,6 @@ GST_START_TEST (test_media_prepare) gst_rtsp_url_free (url); g_object_unref (factory); - /* test reusable media */ - test_prepare_reusable (pool, "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )"); - test_prepare_reusable (pool, - "( videotestsrc is-live=true ! rtpvrawpay pt=96 name=pay0 )"); - g_object_unref (pool); gst_rtsp_thread_pool_cleanup (); } @@ -454,9 +521,11 @@ rtspmedia_suite (void) suite_add_tcase (s, tc); tcase_set_timeout (tc, 20); - tcase_add_test (tc, test_launch); + tcase_add_test (tc, test_media_seek); + tcase_add_test (tc, test_media_seek_no_sinks); tcase_add_test (tc, test_media); tcase_add_test (tc, test_media_prepare); + tcase_add_test (tc, test_media_reusable); tcase_add_test (tc, test_media_dyn_prepare); tcase_add_test (tc, test_media_take_pipeline); tcase_add_test (tc, test_media_reset); diff --git a/tests/check/gst/rtspserver.c b/tests/check/gst/rtspserver.c index 7f636f5..7ed1516 100644 --- a/tests/check/gst/rtspserver.c +++ b/tests/check/gst/rtspserver.c @@ -1059,6 +1059,74 @@ done: } static void +do_test_play_tcp_full (const gchar * range) +{ + GstRTSPConnection *conn; + GstSDPMessage *sdp_message = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + const gchar *audio_control; + GstRTSPRange client_port; + gchar *session = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPTransport *audio_transport = NULL; + gchar *range_out = NULL; + GstRTSPLowerTrans lower_transport = GST_RTSP_LOWER_TRANS_TCP; + + conn = connect_to_server (test_port, TEST_MOUNT_POINT); + + sdp_message = do_describe (conn, TEST_MOUNT_POINT); + get_client_ports (&client_port); + + /* get control strings from DESCRIBE response */ + fail_unless (gst_sdp_message_medias_len (sdp_message) == 2); + sdp_media = gst_sdp_message_get_media (sdp_message, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn, video_control, lower_transport, + &client_port, NULL, &session, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn, audio_control, lower_transport, + &client_port, NULL, &session, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn, GST_RTSP_PLAY, NULL, session, NULL, range, + NULL, NULL, NULL, NULL, NULL, &range_out) == GST_RTSP_STS_OK); + + if (range) + fail_unless_equals_string (range, range_out); + g_free (range_out); + + { + GstRTSPMessage *message; + fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK); + fail_unless (gst_rtsp_connection_receive (conn, message, NULL) == GST_RTSP_OK); + fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA); + gst_rtsp_message_free (message); + } + + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN, + session) == GST_RTSP_STS_OK); + + /* FIXME: The rtsp-server always disconnects the transport before + * sending the RTCP BYE + * receive_rtcp (rtcp_socket, NULL, GST_RTCP_TYPE_BYE); + */ + + /* clean up and iterate so the clean-up can finish */ + g_free (session); + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + gst_sdp_message_free (sdp_message); + gst_rtsp_connection_free (conn); +} + +static void do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport, GMutex * lock) { @@ -1579,6 +1647,70 @@ GST_START_TEST (test_no_session_timeout) GST_END_TEST; +/* media contains two streams: video and audio but only one + * stream is requested */ +GST_START_TEST (test_play_one_active_stream) +{ + GstRTSPConnection *conn; + GstSDPMessage *sdp_message = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + GstRTSPRange client_port; + gchar *session = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPSessionPool *pool; + GstRTSPThreadPool *thread_pool; + + thread_pool = gst_rtsp_server_get_thread_pool (server); + gst_rtsp_thread_pool_set_max_threads (thread_pool, 2); + g_object_unref (thread_pool); + + pool = gst_rtsp_server_get_session_pool (server); + g_signal_connect (server, "client-connected", + G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one); + + start_server (FALSE); + + conn = connect_to_server (test_port, TEST_MOUNT_POINT); + + gst_rtsp_connection_set_remember_session_id (conn, FALSE); + + sdp_message = do_describe (conn, TEST_MOUNT_POINT); + + /* get control strings from DESCRIBE response */ + fail_unless (gst_sdp_message_medias_len (sdp_message) == 2); + sdp_media = gst_sdp_message_get_media (sdp_message, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + get_client_ports (&client_port); + + /* do SETUP for video only */ + fail_unless (do_setup (conn, video_control, &client_port, &session, + &video_transport) == GST_RTSP_STS_OK); + + fail_unless (gst_rtsp_session_pool_get_n_sessions (pool) == 1); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_simple_request (conn, GST_RTSP_PLAY, + session) == GST_RTSP_STS_OK); + + + /* send TEARDOWN request */ + fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN, + session) == GST_RTSP_STS_OK); + + /* clean up and iterate so the clean-up can finish */ + g_object_unref (pool); + g_free (session); + gst_rtsp_transport_free (video_transport); + gst_sdp_message_free (sdp_message); + gst_rtsp_connection_free (conn); + + stop_server (); + iterate (); +} + +GST_END_TEST; GST_START_TEST (test_play_disconnect) { @@ -1772,6 +1904,22 @@ GST_START_TEST (test_play_smpte_range) GST_END_TEST; +GST_START_TEST (test_play_smpte_range_tcp) +{ + start_tcp_server (); + + do_test_play_tcp_full ("npt=5-"); + do_test_play_tcp_full ("smpte=0:00:00-"); + do_test_play_tcp_full ("smpte=1:00:00-"); + do_test_play_tcp_full ("smpte=1:00:03-"); + do_test_play_tcp_full ("clock=20120321T152256Z-"); + + stop_server (); + iterate (); +} + +GST_END_TEST; + static gpointer thread_func (gpointer data) { @@ -2112,6 +2260,113 @@ GST_START_TEST (test_record_tcp) GST_END_TEST; +static void +do_test_multiple_transports (GstRTSPLowerTrans trans1, GstRTSPLowerTrans trans2) +{ + GstRTSPConnection *conn1; + GstRTSPConnection *conn2; + GstSDPMessage *sdp_message1 = NULL; + GstSDPMessage *sdp_message2 = NULL; + const GstSDPMedia *sdp_media; + const gchar *video_control; + const gchar *audio_control; + GstRTSPRange client_port1, client_port2; + gchar *session1 = NULL; + gchar *session2 = NULL; + GstRTSPTransport *video_transport = NULL; + GstRTSPTransport *audio_transport = NULL; + GSocket *rtp_socket, *rtcp_socket; + + conn1 = connect_to_server (test_port, TEST_MOUNT_POINT); + conn2 = connect_to_server (test_port, TEST_MOUNT_POINT); + + sdp_message1 = do_describe (conn1, TEST_MOUNT_POINT); + + get_client_ports_full (&client_port1, &rtp_socket, &rtcp_socket); + /* get control strings from DESCRIBE response */ + sdp_media = gst_sdp_message_get_media (sdp_message1, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message1, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn1, video_control, trans1, + &client_port1, NULL, &session1, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn1, audio_control, trans1, + &client_port1, NULL, &session1, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + + sdp_message2 = do_describe (conn2, TEST_MOUNT_POINT); + + /* get control strings from DESCRIBE response */ + sdp_media = gst_sdp_message_get_media (sdp_message2, 0); + video_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + sdp_media = gst_sdp_message_get_media (sdp_message2, 1); + audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control"); + + get_client_ports_full (&client_port2, NULL, NULL); + /* do SETUP for video and audio */ + fail_unless (do_setup_full (conn2, video_control, trans2, + &client_port2, NULL, &session2, &video_transport, + NULL) == GST_RTSP_STS_OK); + fail_unless (do_setup_full (conn2, audio_control, trans2, + &client_port2, NULL, &session2, &audio_transport, + NULL) == GST_RTSP_STS_OK); + + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn1, GST_RTSP_PLAY, NULL, session1, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK); + /* send PLAY request and check that we get 200 OK */ + fail_unless (do_request (conn2, GST_RTSP_PLAY, NULL, session2, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK); + + + /* receive UDP data */ + receive_rtp (rtp_socket, NULL); + receive_rtcp (rtcp_socket, NULL, 0); + + /* receive TCP data */ + { + GstRTSPMessage *message; + fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK); + fail_unless (gst_rtsp_connection_receive (conn2, message, NULL) == GST_RTSP_OK); + fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA); + gst_rtsp_message_free (message); + } + + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn1, GST_RTSP_TEARDOWN, + session1) == GST_RTSP_STS_OK); + /* send TEARDOWN request and check that we get 200 OK */ + fail_unless (do_simple_request (conn2, GST_RTSP_TEARDOWN, + session2) == GST_RTSP_STS_OK); + + /* clean up and iterate so the clean-up can finish */ + g_object_unref (rtp_socket); + g_object_unref (rtcp_socket); + g_free (session1); + g_free (session2); + gst_rtsp_transport_free (video_transport); + gst_rtsp_transport_free (audio_transport); + gst_sdp_message_free (sdp_message1); + gst_sdp_message_free (sdp_message2); + gst_rtsp_connection_free (conn1); + gst_rtsp_connection_free (conn2); +} + +GST_START_TEST (test_multiple_transports) +{ + start_server (TRUE); + do_test_multiple_transports (GST_RTSP_LOWER_TRANS_UDP, GST_RTSP_LOWER_TRANS_TCP); + stop_server (); +} + +GST_END_TEST; + static Suite * rtspserver_suite (void) { @@ -2140,12 +2395,16 @@ rtspserver_suite (void) tcase_add_test (tc, test_play_multithreaded_timeout_client); tcase_add_test (tc, test_play_multithreaded_timeout_session); tcase_add_test (tc, test_no_session_timeout); + tcase_add_test (tc, test_play_one_active_stream); tcase_add_test (tc, test_play_disconnect); tcase_add_test (tc, test_play_specific_server_port); tcase_add_test (tc, test_play_smpte_range); + tcase_add_test (tc, test_play_smpte_range_tcp); tcase_add_test (tc, test_shared); tcase_add_test (tc, test_announce_without_sdp); tcase_add_test (tc, test_record_tcp); + tcase_add_test (tc, test_multiple_transports); + return s; } diff --git a/tests/check/gst/stream.c b/tests/check/gst/stream.c index 979c0d6..759b7be 100644 --- a/tests/check/gst/stream.c +++ b/tests/check/gst/stream.c @@ -22,7 +22,8 @@ #include <rtsp-stream.h> #include <rtsp-address-pool.h> -GST_START_TEST (test_get_sockets) +static void +get_sockets (GstRTSPLowerTrans lower_transport, GSocketFamily socket_family) { GstPad *srcpad; GstElement *pay; @@ -33,6 +34,7 @@ GST_START_TEST (test_get_sockets) GSocket *socket; gboolean have_ipv4; gboolean have_ipv6; + GstRTSPTransport *transport; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -57,18 +59,44 @@ GST_START_TEST (test_get_sockets) fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV6, GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000, 60000, 0)); + fail_unless (gst_rtsp_address_pool_add_range (pool, "233.252.0.0", + "233.252.0.0", 50000, 60000, 1)); + fail_unless (gst_rtsp_address_pool_add_range (pool, "FF11:DB8::1", + "FF11:DB8::1", 50000, 60000, 1)); gst_rtsp_stream_set_address_pool (stream, pool); fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); - socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); + /* allocate udp ports first */ + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = lower_transport; + + /* no ports allocated, complete stream should fail */ + fail_if (gst_rtsp_stream_complete_stream (stream, transport)); + + /* allocate ports */ + fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, + socket_family, transport, FALSE)); + + fail_unless (gst_rtsp_stream_complete_stream (stream, transport)); + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); + + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4); + else + socket = gst_rtsp_stream_get_rtp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV4); have_ipv4 = (socket != NULL); if (have_ipv4) { fail_unless (g_socket_get_fd (socket) >= 0); g_object_unref (socket); } - socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV4); + else + socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV4); if (have_ipv4) { fail_unless (socket != NULL); fail_unless (g_socket_get_fd (socket) >= 0); @@ -77,14 +105,22 @@ GST_START_TEST (test_get_sockets) fail_unless (socket == NULL); } - socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV6); + else + socket = gst_rtsp_stream_get_rtp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV6); have_ipv6 = (socket != NULL); if (have_ipv6) { fail_unless (g_socket_get_fd (socket) >= 0); g_object_unref (socket); } - socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6); + if (lower_transport == GST_RTSP_LOWER_TRANS_UDP) + socket = gst_rtsp_stream_get_rtcp_socket (stream, G_SOCKET_FAMILY_IPV6); + else + socket = gst_rtsp_stream_get_rtcp_multicast_socket (stream, + G_SOCKET_FAMILY_IPV6); if (have_ipv6) { fail_unless (socket != NULL); fail_unless (g_socket_get_fd (socket) >= 0); @@ -104,8 +140,25 @@ GST_START_TEST (test_get_sockets) gst_object_unref (stream); } +GST_START_TEST (test_get_sockets_udp) +{ + get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV4); + get_sockets (GST_RTSP_LOWER_TRANS_UDP, G_SOCKET_FAMILY_IPV6); +} + GST_END_TEST; +GST_START_TEST (test_get_sockets_mcast) +{ + get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV4); + get_sockets (GST_RTSP_LOWER_TRANS_UDP_MCAST, G_SOCKET_FAMILY_IPV6); +} + +GST_END_TEST; + +/* The purpose of this test is to make sure that it's not possible to allocate + * multicast UDP ports if the address pool does not contain multicast UDP + * addresses. */ GST_START_TEST (test_allocate_udp_ports_fail) { GstPad *srcpad; @@ -114,6 +167,7 @@ GST_START_TEST (test_allocate_udp_ports_fail) GstBin *bin; GstElement *rtpbin; GstRTSPAddressPool *pool; + GstRTSPTransport *transport; srcpad = gst_pad_new ("testsrcpad", GST_PAD_SRC); fail_unless (srcpad != NULL); @@ -135,7 +189,13 @@ GST_START_TEST (test_allocate_udp_ports_fail) "192.168.1.1", 6000, 6001, 0)); gst_rtsp_stream_set_address_pool (stream, pool); - fail_if (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); + fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL)); + + fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK); + transport->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST; + fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4, + transport, FALSE)); + fail_unless (gst_rtsp_transport_free (transport) == GST_RTSP_OK); g_object_unref (pool); fail_unless (gst_rtsp_stream_leave_bin (stream, bin, rtpbin)); @@ -433,7 +493,8 @@ rtspstream_suite (void) TCase *tc = tcase_create ("general"); suite_add_tcase (s, tc); - tcase_add_test (tc, test_get_sockets); + tcase_add_test (tc, test_get_sockets_udp); + tcase_add_test (tc, test_get_sockets_mcast); tcase_add_test (tc, test_allocate_udp_ports_fail); tcase_add_test (tc, test_get_multicast_address); tcase_add_test (tc, test_multicast_address_and_unicast_udp); |