From f1088f368fd5a3d5b5f16896cea8d2f32a30fe30 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Mon, 18 Sep 2017 19:31:31 +0200 Subject: rtspclientsink: Use a mutex for protecting against concurrent send/receives This is a simple port of: * a722f6e8329032c6eda4865d6a07f4ba5981d7ea * c438545dc9e2f14f657bc0ef261fff726449867b * cd17c71dcea5c9310d21f1347c7520983e5869ac in gst-plugins-good. --- gst/rtsp-sink/gstrtspclientsink.c | 86 +++++++++++++++++++++++---------------- gst/rtsp-sink/gstrtspclientsink.h | 3 ++ 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/gst/rtsp-sink/gstrtspclientsink.c b/gst/rtsp-sink/gstrtspclientsink.c index b2aaec8..3df5e0d 100644 --- a/gst/rtsp-sink/gstrtspclientsink.c +++ b/gst/rtsp-sink/gstrtspclientsink.c @@ -666,6 +666,9 @@ gst_rtsp_client_sink_init (GstRTSPClientSink * sink) sink->state = GST_RTSP_STATE_INVALID; + g_mutex_init (&sink->conninfo.send_lock); + g_mutex_init (&sink->conninfo.recv_lock); + sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin"); gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE); gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin)); @@ -714,6 +717,9 @@ gst_rtsp_client_sink_finalize (GObject * object) g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock); g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock); + g_mutex_clear (&rtsp_client_sink->conninfo.send_lock); + g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock); + g_mutex_clear (&rtsp_client_sink->send_lock); g_mutex_clear (&rtsp_client_sink->preroll_lock); @@ -1137,6 +1143,9 @@ gst_rtsp_client_sink_request_new_pad (GstElement * element, (void) gst_rtsp_client_sink_get_factories (); + g_mutex_init (&context->conninfo.send_lock); + g_mutex_init (&context->conninfo.recv_lock); + GST_RTSP_STATE_LOCK (sink); sink->contexts = g_list_prepend (sink->contexts, context); GST_RTSP_STATE_UNLOCK (sink); @@ -1182,6 +1191,9 @@ gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad) g_free (context->conninfo.location); context->conninfo.location = NULL; + g_mutex_clear (&context->conninfo.send_lock); + g_mutex_clear (&context->conninfo.recv_lock); + g_free (context); gst_element_remove_pad (element, pad); @@ -1603,28 +1615,34 @@ gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink) static GstRTSPResult gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink, - GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) + GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult ret; - if (conn) - ret = gst_rtsp_connection_send (conn, message, timeout); - else + if (conninfo->connection) { + g_mutex_lock (&conninfo->send_lock); + ret = gst_rtsp_connection_send (conninfo->connection, message, timeout); + g_mutex_unlock (&conninfo->send_lock); + } else { ret = GST_RTSP_ERROR; + } return ret; } static GstRTSPResult gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink, - GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) + GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult ret; - if (conn) - ret = gst_rtsp_connection_receive (conn, message, timeout); - else + if (conninfo->connection) { + g_mutex_lock (&conninfo->recv_lock); + ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout); + g_mutex_unlock (&conninfo->recv_lock); + } else { ret = GST_RTSP_ERROR; + } return ret; } @@ -1793,7 +1811,7 @@ gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink, /* FIXME, handle server request, reply with OK, for now */ static GstRTSPResult gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink, - GstRTSPConnection * conn, GstRTSPMessage * request) + GstRTSPConnInfo * conninfo, GstRTSPMessage * request) { GstRTSPMessage response = { 0 }; GstRTSPResult res; @@ -1818,7 +1836,7 @@ gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink, if (sink->debug) gst_rtsp_message_dump (&response); - res = gst_rtsp_client_sink_connection_send (sink, conn, &response, NULL); + res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL); if (res < 0) goto send_error; @@ -1869,7 +1887,7 @@ gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink) gst_rtsp_message_dump (&request); res = - gst_rtsp_client_sink_connection_send (sink, sink->conninfo.connection, + gst_rtsp_client_sink_connection_send (sink, &sink->conninfo, &request, NULL); if (res < 0) goto send_error; @@ -1920,7 +1938,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink) * keep-alive request to keep the session open. */ res = gst_rtsp_client_sink_connection_receive (sink, - sink->conninfo.connection, &message, &tv_timeout); + &sink->conninfo, &message, &tv_timeout); switch (res) { case GST_RTSP_OK: @@ -1964,7 +1982,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink) /* server sends us a request message, handle it */ res = gst_rtsp_client_sink_handle_request (sink, - sink->conninfo.connection, &message); + &sink->conninfo, &message); if (res == GST_RTSP_EEOF) goto server_eof; else if (res < 0) @@ -2479,7 +2497,7 @@ no_user_pass: static GstRTSPResult gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink, - GstRTSPConnection * conn, GstRTSPMessage * request, + GstRTSPConnInfo * conninfo, GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPStatusCode * code) { GstRTSPResult res; @@ -2496,14 +2514,14 @@ again: g_mutex_lock (&sink->send_lock); res = - gst_rtsp_client_sink_connection_send (sink, conn, request, + gst_rtsp_client_sink_connection_send (sink, conninfo, request, sink->ptcp_timeout); if (res < 0) { g_mutex_unlock (&sink->send_lock); goto send_error; } - gst_rtsp_connection_reset_timeout (conn); + gst_rtsp_connection_reset_timeout (conninfo->connection); /* See if we should handle the response */ if (response == NULL) { @@ -2512,7 +2530,7 @@ again: } next: res = - gst_rtsp_client_sink_connection_receive (sink, conn, response, + gst_rtsp_client_sink_connection_receive (sink, conninfo, response, sink->ptcp_timeout); g_mutex_unlock (&sink->send_lock); @@ -2526,7 +2544,7 @@ next: switch (response->type) { case GST_RTSP_MESSAGE_REQUEST: - res = gst_rtsp_client_sink_handle_request (sink, conn, response); + res = gst_rtsp_client_sink_handle_request (sink, conninfo, response); if (res == GST_RTSP_EEOF) goto server_eof; else if (res < 0) @@ -2663,7 +2681,7 @@ gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state) * Returns: #GST_RTSP_OK if the processing was successful. */ static GstRTSPResult -gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn, +gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo, GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPStatusCode * code) { @@ -2685,7 +2703,7 @@ gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn, method = request->type_data.request.method; if ((res = - gst_rtsp_client_sink_try_send (sink, conn, request, response, + gst_rtsp_client_sink_try_send (sink, conninfo, request, response, &int_code)) < 0) goto error; @@ -2892,7 +2910,7 @@ gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink, ("Retrieving server options")); if ((res = - gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request, + gst_rtsp_client_sink_send (sink, &sink->conninfo, &request, &response, NULL)) < 0) goto send_error; @@ -3074,7 +3092,7 @@ gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async, GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream")); if ((res = - gst_rtsp_client_sink_send (sink, info->connection, &request, + gst_rtsp_client_sink_send (sink, info, &request, &response, NULL)) < 0) goto send_error; @@ -3486,7 +3504,7 @@ do_send_data (GstBuffer * buffer, guint8 channel, gst_rtsp_message_take_body (&message, map_info.data, map_info.size); res = - gst_rtsp_client_sink_try_send (sink, sink->conninfo.connection, &message, + gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message, NULL, NULL); gst_rtsp_message_steal_body (&message, &data, &usize); @@ -3534,7 +3552,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async) GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data; GstRTSPStream *stream; - GstRTSPConnection *conn; + GstRTSPConnInfo *info; GstRTSPProfile profiles; GstRTSPProfile cur_profile; gchar *transports; @@ -3571,14 +3589,14 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async) stream); continue; } - conn = context->conninfo.connection; + info = &context->conninfo; } else { - conn = sink->conninfo.connection; + info = &sink->conninfo; } GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream, context->conninfo.location); - conn_socket = gst_rtsp_connection_get_read_socket (conn); + conn_socket = gst_rtsp_connection_get_read_socket (info->connection); sa = g_socket_get_local_address (conn_socket, NULL); family = g_socket_address_get_family (sa); g_object_unref (sa); @@ -3649,7 +3667,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async) context->index)); /* handle the code ourselves */ - res = gst_rtsp_client_sink_send (sink, conn, &request, &response, &code); + res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code); if (res < 0) goto send_error; @@ -3981,7 +3999,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async) ("Sending server stream info")); if ((res = - gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request, + gst_rtsp_client_sink_send (sink, &sink->conninfo, &request, &response, NULL)) < 0) goto send_error; @@ -4016,7 +4034,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async) if (async) GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording")); if ((res = - gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request, + gst_rtsp_client_sink_send (sink, &sink->conninfo, &request, &response, NULL)) < 0) goto send_error; @@ -4108,7 +4126,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async) * aggregate control */ for (walk = sink->contexts; walk; walk = g_list_next (walk)) { GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data; - GstRTSPConnection *conn; + GstRTSPConnInfo *info; const gchar *setup_url; /* try aggregate control first but do non-aggregate control otherwise */ @@ -4118,9 +4136,9 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async) continue; if (sink->conninfo.connection) { - conn = sink->conninfo.connection; + info = &sink->conninfo; } else if (stream->conninfo.connection) { - conn = stream->conninfo.connection; + info = &stream->conninfo; } else { continue; } @@ -4135,7 +4153,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async) goto create_request_failed; if ((res = - gst_rtsp_client_sink_send (sink, conn, &request, &response, + gst_rtsp_client_sink_send (sink, info, &request, &response, NULL)) < 0) goto send_error; diff --git a/gst/rtsp-sink/gstrtspclientsink.h b/gst/rtsp-sink/gstrtspclientsink.h index a8aef5b..b27daf8 100644 --- a/gst/rtsp-sink/gstrtspclientsink.h +++ b/gst/rtsp-sink/gstrtspclientsink.h @@ -86,6 +86,9 @@ struct _GstRTSPConnInfo { GstRTSPConnection *connection; gboolean connected; gboolean flushing; + + GMutex send_lock; + GMutex recv_lock; }; typedef struct _GstRTSPStreamInfo GstRTSPStreamInfo; -- cgit v1.2.3