summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Ashley <bugzilla@ashley-family.net>2019-02-05 16:34:40 +0000
committerTim-Philipp Müller <tim@centricular.com>2019-02-19 17:55:12 +0000
commitc2fe4e58ad1db67c1a6e2810a6ba067da97b4e80 (patch)
tree62380fc3607c82fc27ecb1488aa09368445229b9
parentd2d912f34a6fdc87fce319d93662b6437dd704ef (diff)
curlhttpsrc: fix various leaks and thread safety issues
curlhttpsrc uses a single thread running the gst_curl_http_src_curl_multi_loop() function to handle receiving data and messages from libcurl. Each instance of curlhttpsrc adds an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits for the multi_loop to perform the HTTP request. Valgrind has shown up race conditions and memory leaks: 1. gst_curl_http_src_change_state() does not wait for the multi_loop to complete before going to the NULL state, which means that an instance of GstCurlHttpSrc can be released while gst_curl_http_src_curl_multi_loop() still has a reference to it. 2. if multiple elements try to be removed from the queue at once, only the last one is deleted. 3. source->caps is leaked 4. curl multi_handle is leaked 5. leak of curl_handle if URI not set 6. leak of http_headers when reusing element 7. null pointer dereference in negotiate caps 8. double-free of the default user-agent string 9. leak of multi_task_context.task This commit changes the logic so that each element has a connection status, which is used by the multi_loop to decide when to remove an element from its queue. An instance of curlhttpsrc will not enter the NULL state until its reference has been removed from the queue. When shutting down the curl multi loop, the memory allocated from the call to curl_multi_init() is now released. When gstadaptivedemux uses a URI source element, it will re-use it for multiple requests, moving it between READY and PLAYING between each request. curlhttpsrc was leaking the http_headers structure in this use case. The gst_curl_http_src_negotiate_caps() function extracts the "response-headers" field from the http_headers, but did not check that this field might be NULL. If the user-agent property is set, the global user-agent string was freed. This caused a double-free error if the user-agent is ever set a second time during the execution of the process. There are situations within curlhttpsrc where the code needs both the global multi_task_context mutex and the per-element buffer_mutex. To avoid deadlocks, it is vital that the order in which these are requested is always the same. This commit modifies the locking order to always be in the order: 1. multi_task_context.task_rec_mutex 2. buffer_mutex Fixes #876
-rw-r--r--ext/curl/gstcurldefaults.h2
-rw-r--r--ext/curl/gstcurlhttpsrc.c479
-rw-r--r--ext/curl/gstcurlhttpsrc.h43
-rw-r--r--ext/curl/gstcurlqueue.c7
-rw-r--r--ext/curl/gstcurlqueue.h2
5 files changed, 303 insertions, 230 deletions
diff --git a/ext/curl/gstcurldefaults.h b/ext/curl/gstcurldefaults.h
index 8e687f749..2f4bc8393 100644
--- a/ext/curl/gstcurldefaults.h
+++ b/ext/curl/gstcurldefaults.h
@@ -60,7 +60,7 @@
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY ((void *)0)
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME ((void *)0)
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD ((void *)0)
-#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT gst_curl_http_src_default_useragent
+#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "GStreamer curlhttpsrc libcurl"
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING FALSE
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION 1L
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS -1
diff --git a/ext/curl/gstcurlhttpsrc.c b/ext/curl/gstcurlhttpsrc.c
index c1a0bcf5c..9a2dbb0d5 100644
--- a/ext/curl/gstcurlhttpsrc.c
+++ b/ext/curl/gstcurlhttpsrc.c
@@ -73,6 +73,46 @@
* </refsect2>
*/
+/*
+ * Thread safety notes.
+ *
+ * GstCurlHttpSrc uses a single thread running the
+ * gst_curl_http_src_curl_multi_loop() function to handle receiving
+ * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
+ * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
+ * for the multi_loop to perform the HTTP request.
+ *
+ * When an instance of GstCurlHttpSrc wants to make a request (i.e.
+ * it has moved to the PLAYING state) it adds itself to the
+ * multi_task_context.queue list and signals the multi_loop task.
+ *
+ * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
+ * to wait for gst_curl_http_src_curl_multi_loop() to perform the
+ * request and signal completion.
+ *
+ * Each instance of GstCurlHttpSrc is protected by the mutexes:
+ * 1. uri_mutex
+ * 2. buffer_mutex
+ *
+ * uri_mutex is used to protect access to the uri field.
+ *
+ * buffer_mutex is used to protect access to buffer_cond, state and
+ * connection_status.
+ *
+ * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
+ * 1. multi_task_context.task_rec_mutex
+ * 2. multi_task_context.mutex
+ *
+ * multi_task_context.task_rec_mutex is only used by GstTask.
+ *
+ * multi_task_context.mutex is used to protect access to queue and state
+ *
+ * To avoid deadlock, it is vital that if both multi_task_context.mutex
+ * and buffer_mutex are required, that they are locked in the order:
+ * 1. multi_task_context.mutex
+ * 2. buffer_mutex
+ */
+
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
@@ -87,6 +127,35 @@ GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
#define GST_CAT_DEFAULT gst_curl_http_src_debug
GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
+enum
+{
+ PROP_0,
+ PROP_URI,
+ PROP_USERNAME,
+ PROP_PASSWORD,
+ PROP_PROXYURI,
+ PROP_PROXYUSERNAME,
+ PROP_PROXYPASSWORD,
+ PROP_COOKIES,
+ PROP_USERAGENT,
+ PROP_HEADERS,
+ PROP_COMPRESS,
+ PROP_REDIRECT,
+ PROP_MAXREDIRECT,
+ PROP_KEEPALIVE,
+ PROP_TIMEOUT,
+ PROP_STRICT_SSL,
+ PROP_SSL_CA_FILE,
+ PROP_RETRIES,
+ PROP_CONNECTIONMAXTIME,
+ PROP_MAXCONCURRENT_SERVER,
+ PROP_MAXCONCURRENT_PROXY,
+ PROP_MAXCONCURRENT_GLOBAL,
+ PROP_HTTPVERSION,
+ PROP_IRADIO_MODE,
+ PROP_MAX
+};
+
/*
* Make a source pad template to be able to kick out recv'd data
*/
@@ -138,14 +207,15 @@ static size_t gst_curl_http_src_get_header (void *header, size_t size,
static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
size_t nmemb, void *src);
static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
+static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
static char *gst_curl_http_src_strcasestr (const char *haystack,
const char *needle);
-curl_version_info_data *gst_curl_http_src_curl_capabilities;
-GstCurlHttpVersion pref_http_ver;
-gchar *gst_curl_http_src_default_useragent;
+static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
+static GstCurlHttpVersion pref_http_ver;
#define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
+
static GType
gst_curl_http_version_get_type (void)
{
@@ -237,10 +307,6 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
pref_http_ver = default_http_version;
}
- gst_curl_http_src_default_useragent =
- g_strdup_printf ("GStreamer curlhttpsrc libcurl/%s",
- gst_curl_http_src_curl_capabilities->version);
-
gobject_class->set_property = gst_curl_http_src_set_property;
gobject_class->get_property = gst_curl_http_src_get_property;
gobject_class->finalize = gst_curl_http_src_finalize;
@@ -285,7 +351,8 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
g_object_class_install_property (gobject_class, PROP_USERAGENT,
g_param_spec_string ("user-agent", "User-Agent",
- "URI of resource requested", GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT,
+ "URI of resource requested",
+ GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_COMPRESS,
@@ -390,9 +457,13 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
__LINE__, NULL, "Testing the curl_multi_loop debugging prints");
#endif
+ klass->multi_task_context.task = NULL;
+ klass->multi_task_context.refcount = 0;
+ klass->multi_task_context.queue = NULL;
+ klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
+ klass->multi_task_context.multi_handle = NULL;
g_mutex_init (&klass->multi_task_context.mutex);
g_cond_init (&klass->multi_task_context.signal);
- g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
gst_element_class_set_static_metadata (gstelement_class,
"HTTP Client Source using libcURL",
@@ -410,8 +481,10 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
+ g_mutex_lock (&source->uri_mutex);
g_free (source->uri);
source->uri = g_value_dup_string (value);
+ g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_free (source->username);
@@ -447,7 +520,9 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
const GstStructure *s = gst_value_get_structure (value);
if (source->request_headers)
gst_structure_free (source->request_headers);
- source->request_headers = s ? gst_structure_copy (s) : NULL;
+ source->request_headers =
+ s ? gst_structure_copy (s) :
+ gst_structure_new_empty (REQUEST_HEADERS_NAME);
}
break;
case PROP_COMPRESS:
@@ -505,7 +580,9 @@ gst_curl_http_src_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
+ g_mutex_lock (&source->uri_mutex);
g_value_set_string (value, source->uri);
+ g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_value_set_string (value, source->username);
@@ -591,9 +668,12 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->proxy_user = NULL;
source->proxy_pass = NULL;
source->cookies = NULL;
- source->user_agent = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT;
+ g_assert (gst_curl_http_src_curl_capabilities != NULL);
+ source->user_agent =
+ g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
+ gst_curl_http_src_curl_capabilities->version);
source->number_cookies = 0;
- source->request_headers = NULL;
+ source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
@@ -608,8 +688,8 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
source->retries_remaining = source->total_retries;
source->slist = NULL;
+ source->accept_compressed_encodings = FALSE;
- gst_caps_replace (&source->caps, NULL);
gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
@@ -617,18 +697,23 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
g_mutex_init (&source->uri_mutex);
g_mutex_init (&source->buffer_mutex);
- g_cond_init (&source->signal);
+ g_cond_init (&source->buffer_cond);
source->buffer = NULL;
source->buffer_len = 0;
source->state = GSTCURL_NONE;
source->pending_state = GSTCURL_NONE;
- source->status_code = 0;
+ source->transfer_begun = FALSE;
+ source->data_received = FALSE;
+ source->connection_status = GSTCURL_NOT_CONNECTED;
source->http_headers = NULL;
+ source->content_type = NULL;
+ source->status_code = 0;
source->hdrs_updated = FALSE;
source->curl_result = CURLE_OK;
+ gst_caps_replace (&source->caps, NULL);
GSTCURL_FUNCTION_EXIT (source);
}
@@ -666,6 +751,8 @@ gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
#endif
/* Start the thread */
+ g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
+ klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
klass->multi_task_context.task = gst_task_new (
(GstTaskFunction) gst_curl_http_src_curl_multi_loop,
(gpointer) & klass->multi_task_context, NULL);
@@ -709,13 +796,20 @@ gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
klass->multi_task_context.refcount);
- if (klass->multi_task_context.refcount <= 0) {
+ if (klass->multi_task_context.refcount == 0) {
/* Everything's done! Clean up. */
- gst_task_pause (klass->multi_task_context.task);
+ gst_task_stop (klass->multi_task_context.task);
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
+ GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
gst_task_join (klass->multi_task_context.task);
+ gst_object_unref (klass->multi_task_context.task);
+ klass->multi_task_context.task = NULL;
+ curl_multi_cleanup (klass->multi_task_context.multi_handle);
+ klass->multi_task_context.multi_handle = NULL;
+ g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
+ GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
} else {
g_mutex_unlock (&klass->multi_task_context.mutex);
}
@@ -734,6 +828,9 @@ gst_curl_http_src_finalize (GObject * obj)
gst_curl_http_src_cleanup_instance (src);
GSTCURL_FUNCTION_EXIT (src);
+
+ /* Chain up to parent class */
+ G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
}
/*
@@ -748,20 +845,25 @@ gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
GstCurlHttpSrcClass *klass;
GstStructure *empty_headers;
+ GstBaseSrc *basesrc;
+
+ GSTCURL_FUNCTION_ENTRY (src);
klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
+ basesrc = GST_BASE_SRC_CAST (src);
- GSTCURL_FUNCTION_ENTRY (src);
+retry:
ret = GST_FLOW_OK;
-
+ /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
+ needed, multi_task_context.mutex must be acquired first */
+ g_mutex_lock (&klass->multi_task_context.mutex);
g_mutex_lock (&src->buffer_mutex);
if (src->state == GSTCURL_UNLOCK) {
ret = GST_FLOW_FLUSHING;
goto escape;
}
-retry:
if (!src->transfer_begun) {
GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
/* Create the Easy Handle and set up the session. */
@@ -771,19 +873,14 @@ retry:
goto escape;
}
- g_mutex_lock (&klass->multi_task_context.mutex);
-
if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
== FALSE) {
GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
ret = GST_FLOW_ERROR;
goto escape;
}
-
/* Signal the worker thread */
- klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT;
g_cond_signal (&klass->multi_task_context.signal);
- g_mutex_unlock (&klass->multi_task_context.mutex);
src->state = GSTCURL_OK;
src->transfer_begun = TRUE;
@@ -791,6 +888,9 @@ retry:
GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
+ if (src->http_headers != NULL) {
+ gst_structure_free (src->http_headers);
+ }
empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
URI_NAME, G_TYPE_STRING, src->uri,
@@ -800,9 +900,12 @@ retry:
GST_INFO_OBJECT (src, "Created a new headers object");
}
+ g_mutex_unlock (&klass->multi_task_context.mutex);
+
/* Wait for data to become available, then punt it downstream */
- while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)) {
- g_cond_wait (&src->signal, &src->buffer_mutex);
+ while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
+ && (src->connection_status == GSTCURL_CONNECTED)) {
+ g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
}
if (src->state == GSTCURL_UNLOCK) {
@@ -811,14 +914,16 @@ retry:
src->buffer = NULL;
src->buffer_len = 0;
}
- ret = GST_FLOW_FLUSHING;
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_FLUSHING;
}
ret = gst_curl_http_src_handle_response (src);
switch (ret) {
case GST_FLOW_ERROR:
- goto escape; /* Don't attempt a retry, just bomb out */
+ /* Don't attempt a retry, just bomb out */
+ g_mutex_unlock (&src->buffer_mutex);
+ return ret;
case GST_FLOW_CUSTOM_ERROR:
if (src->data_received == TRUE) {
/*
@@ -830,14 +935,14 @@ retry:
*/
GST_WARNING_OBJECT (src,
"Failed mid-transfer, can't continue for URI %s", src->uri);
- ret = GST_FLOW_ERROR;
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_ERROR;
}
src->retries_remaining--;
if (src->retries_remaining == 0) {
GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
- ret = GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
}
GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
src->state = GSTCURL_NONE;
@@ -903,9 +1008,13 @@ retry:
GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
}
}
+ g_mutex_unlock (&src->buffer_mutex);
+ GSTCURL_FUNCTION_EXIT (src);
+ return ret;
escape:
g_mutex_unlock (&src->buffer_mutex);
+ g_mutex_unlock (&klass->multi_task_context.mutex);
GSTCURL_FUNCTION_EXIT (src);
return ret;
@@ -942,6 +1051,13 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
gint i;
GSTCURL_FUNCTION_ENTRY (s);
+ /* This is mandatory and yet not default option, so if this is NULL
+ * then something very bad is going on. */
+ if (s->uri == NULL) {
+ GST_ERROR_OBJECT (s, "No URI for curl!");
+ return NULL;
+ }
+
handle = curl_easy_init ();
if (handle == NULL) {
GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
@@ -949,14 +1065,7 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
}
GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
- /* This is mandatory and yet not default option, so if this is NULL
- * then something very bad is going on. */
- if (s->uri == NULL) {
- GST_ERROR_OBJECT (s, "No URI for curl!");
- return NULL;
- }
gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
-
gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
@@ -1164,7 +1273,6 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
RESPONSE_HEADERS_NAME);
if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
GstEvent *hdrs_event;
- GstStructure *empty_headers;
gst_element_post_message (GST_ELEMENT_CAST (src),
gst_message_new_element (GST_OBJECT_CAST (src),
@@ -1172,15 +1280,9 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
/* gst_event_new_custom takes ownership of our structure */
hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
- src->http_headers);
+ gst_structure_copy (src->http_headers));
gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
GST_INFO_OBJECT (src, "Pushed headers downstream");
- empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
- src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
- URI_NAME, G_TYPE_STRING, src->uri,
- REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
- RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
- gst_structure_free (empty_headers);
}
src->hdrs_updated = FALSE;
@@ -1198,29 +1300,28 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
static gboolean
gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
{
+ const GValue *response_headers;
+ const GstStructure *response_struct;
+
GST_INFO_OBJECT (src, "Negotiating caps...");
if (src->caps && src->http_headers) {
- const GValue *response_headers = gst_structure_get_value (src->http_headers,
- RESPONSE_HEADERS_NAME);
-
- if (gst_structure_has_field (gst_value_get_structure (response_headers),
- "content-type") == TRUE) {
- const GValue *gv_content_type =
- gst_structure_get_value (gst_value_get_structure (response_headers),
- "content-type");
- if (G_VALUE_HOLDS_STRING (gv_content_type) == TRUE) {
- const gchar *content_type = g_value_get_string (gv_content_type);
- GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s",
- content_type);
- src->caps = gst_caps_make_writable (src->caps);
- gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
- content_type, NULL);
- if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
- GST_ERROR_OBJECT (src, "Setting caps failed!");
- return FALSE;
- }
- } else {
- GST_ERROR_OBJECT (src, "Content Type doesn't contain expected string");
+ response_headers =
+ gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
+ if (!response_headers) {
+ GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
+ return FALSE;
+ }
+ response_struct = gst_value_get_structure (response_headers);
+ if (gst_structure_has_field_typed (response_struct, "content-type",
+ G_TYPE_STRING)) {
+ const gchar *content_type =
+ gst_structure_get_string (response_struct, "content-type");
+ GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
+ src->caps = gst_caps_make_writable (src->caps);
+ gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
+ content_type, NULL);
+ if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
+ GST_ERROR_OBJECT (src, "Setting caps failed!");
return FALSE;
}
}
@@ -1268,8 +1369,10 @@ gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
- /* The pipeline has ended, so signal any running request to end. */
- gst_curl_http_src_request_remove (source);
+ GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
+ /* The pipeline has ended, so signal any running request to end
+ and wait until the multi_loop has stopped using this element */
+ gst_curl_http_src_wait_until_removed (source);
gst_curl_http_src_unref_multi (source);
break;
default:
@@ -1314,17 +1417,25 @@ gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
g_free (src->cookies);
src->cookies = NULL;
+ g_free (src->user_agent);
+ src->user_agent = NULL;
+
g_mutex_clear (&src->buffer_mutex);
- g_cond_clear (&src->signal);
+ g_cond_clear (&src->buffer_cond);
g_free (src->buffer);
src->buffer = NULL;
+ if (src->request_headers) {
+ gst_structure_free (src->request_headers);
+ src->request_headers = NULL;
+ }
if (src->http_headers != NULL) {
gst_structure_free (src->http_headers);
src->http_headers = NULL;
}
+ gst_caps_replace (&src->caps, NULL);
gst_curl_http_src_destroy_easy_handle (src);
}
@@ -1338,10 +1449,12 @@ gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_URI:
+ g_mutex_lock (&src->uri_mutex);
gst_query_set_uri (query, src->uri);
if (src->redirect_uri != NULL) {
gst_query_set_uri_redirection (query, src->redirect_uri);
}
+ g_mutex_unlock (&src->uri_mutex);
ret = TRUE;
break;
default:
@@ -1366,22 +1479,17 @@ gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
response_headers = gst_structure_get_value (src->http_headers,
RESPONSE_HEADERS_NAME);
- if (gst_structure_has_field (gst_value_get_structure (response_headers),
- "content-length") == TRUE) {
- const GValue *content_length =
- gst_structure_get_value (gst_value_get_structure (response_headers),
+ if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
+ "content-length", G_TYPE_STRING)) {
+ const gchar *content_length =
+ gst_structure_get_string (gst_value_get_structure (response_headers),
"content-length");
- if (G_VALUE_HOLDS_STRING (content_length) == TRUE) {
- const gchar *len = g_value_get_string (content_length);
- *size = (guint64) g_ascii_strtoull (len, NULL, 10);
- ret = TRUE;
- } else {
- GST_ERROR_OBJECT (src, "Content Length doesn't contain expected string");
- }
+ *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
+ ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (src,
+ "No content length has yet been set, or there was an error!");
}
-
- GST_DEBUG_OBJECT (src,
- "No content length has yet been set, or there was an error!");
return ret;
}
@@ -1449,6 +1557,7 @@ gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
source->uri = g_strdup (uri);
if (source->uri == NULL) {
+ g_mutex_unlock (&source->uri_mutex);
return FALSE;
}
source->retries_remaining = source->total_retries;
@@ -1467,19 +1576,32 @@ static gboolean
gst_curl_http_src_unlock (GstBaseSrc * bsrc)
{
GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+ gboolean want_removal = FALSE;
g_mutex_lock (&src->buffer_mutex);
if (src->state != GSTCURL_UNLOCK) {
if (src->state == GSTCURL_OK) {
/* A transfer is running, cancel it */
- gst_curl_http_src_request_remove (src);
+ if (src->connection_status == GSTCURL_CONNECTED) {
+ src->connection_status = GSTCURL_WANT_REMOVAL;
+ }
+ want_removal = TRUE;
}
src->pending_state = src->state;
src->state = GSTCURL_UNLOCK;
}
- g_cond_signal (&src->signal);
+ g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
+ if (want_removal) {
+ GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
+ GST_TYPE_CURL_HTTP_SRC,
+ GstCurlHttpSrcClass);
+ g_mutex_lock (&klass->multi_task_context.mutex);
+ g_cond_signal (&klass->multi_task_context.signal);
+ g_mutex_unlock (&klass->multi_task_context.mutex);
+ }
+
return TRUE;
}
@@ -1496,7 +1618,7 @@ gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&src->buffer_mutex);
src->state = src->pending_state;
src->pending_state = GSTCURL_NONE;
- g_cond_signal (&src->signal);
+ g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
return TRUE;
@@ -1510,9 +1632,10 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
{
GstCurlHttpSrcMultiTaskContext *context;
GstCurlHttpSrcQueueElement *qelement, *qnext;
- int i, still_running;
- gboolean cond = FALSE;
+ gint i, still_running = 0;
CURLMsg *curl_message;
+ GstCurlHttpSrc *elt;
+ guint active = 0;
context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
@@ -1521,49 +1644,60 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
/* Someone is holding a reference to us, but isn't using us so to avoid
* unnecessary clock cycle wasting, sit in a conditional wait until woken.
*/
- while (context->state == GSTCURL_MULTI_LOOP_STATE_WAIT) {
- GSTCURL_DEBUG_PRINT ("Entering wait state...");
+ while (context->queue == NULL
+ && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
+ GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
g_cond_wait (&context->signal, &context->mutex);
GSTCURL_DEBUG_PRINT ("Received wake up call!");
}
+ if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
+ GSTCURL_INFO_PRINT ("Got instruction to shut down");
+ goto out;
+ }
- if (context->state == GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) {
- GSTCURL_DEBUG_PRINT ("Received a new item on the queue!");
- if (context->queue == NULL) {
- GSTCURL_ERROR_PRINT ("Request Queue was empty on a Queue Event!");
- context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
- return;
- }
-
- /*
- * Use the running mutex to lock access to each element, as the
- * mutex's memory barriers stop cache optimisations from meaning
- * flag values can't be trusted. The trylock will only let us in
- * once and should fail immediately prior.
- */
- qelement = context->queue;
- while (qelement != NULL) {
- if (g_mutex_trylock (&qelement->running) == TRUE) {
+ /* check for elements that need to be started or removed */
+ qelement = context->queue;
+ while (qelement != NULL) {
+ qnext = qelement->next;
+ elt = qelement->p;
+ /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
+ needed, multi_task_context.mutex must be acquired first */
+ g_mutex_lock (&elt->buffer_mutex);
+ if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
+ curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
+ if (elt->state == GSTCURL_UNLOCK) {
+ elt->pending_state = GSTCURL_REMOVED;
+ } else {
+ elt->state = GSTCURL_REMOVED;
+ }
+ elt->connection_status = GSTCURL_NOT_CONNECTED;
+ gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
+ g_cond_signal (&elt->buffer_cond);
+ } else if (elt->connection_status == GSTCURL_CONNECTED) {
+ active++;
+ if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
- cond = TRUE;
curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
}
- qelement = qelement->next;
}
+ g_mutex_unlock (&elt->buffer_mutex);
+ qelement = qnext;
+ }
- if (cond != TRUE) {
- GSTCURL_WARNING_PRINT ("All curl handles already added for QUEUE_EVENT!");
- } else {
- GSTCURL_DEBUG_PRINT ("Finished adding all handles, continuing.");
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- }
- g_mutex_unlock (&context->mutex);
- } else if (context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
+ if (active == 0) {
+ GSTCURL_DEBUG_PRINT ("No active elements");
+ goto out;
+ }
+
+ /* perform a select() on all of the active sockets and process any
+ messages from curl */
+ {
struct timeval timeout;
gint rc;
fd_set fdread, fdwrite, fdexcep;
int maxfd = -1;
long curl_timeo = -1;
+ gboolean cond = FALSE;
/* Because curl can possibly take some time here, be nice and let go of the
* mutex so other threads can perform state/queue operations as we don't
@@ -1604,6 +1738,8 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
break;
}
+ g_mutex_lock (&context->mutex);
+
/*
* Check the CURL message buffer to find out if any transfers have
* completed. If they have, call the signal_finished function which
@@ -1617,73 +1753,17 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
} else if (curl_message->msg == CURLMSG_DONE) {
/* A hack, but I have seen curl_message->easy_handle being
* NULL randomly, so check for that. */
- g_mutex_lock (&context->mutex);
- if (curl_message->easy_handle == NULL) {
- break;
- }
- curl_multi_remove_handle (context->multi_handle,
- curl_message->easy_handle);
- gst_curl_http_src_remove_queue_handle (&context->queue,
- curl_message->easy_handle, curl_message->data.result);
- g_mutex_unlock (&context->mutex);
- }
- }
-
- if (still_running == 0) {
- /* We've finished processing, so set the state to wait.
- *
- * This is a little more complex, as we need to catch the edge
- * case of another thread adding a queue item while we've been
- * working.
- */
- g_mutex_lock (&context->mutex);
- if ((context->state != GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) &&
- (context->state != GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL)) {
- context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
- }
- g_mutex_unlock (&context->mutex);
- }
- }
- /* Is the following even necessary any more...? */
- else if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
- g_mutex_unlock (&context->mutex);
- /* Something wants us to shut down, so best to do a full cleanup as it
- * might be that something's gone bang.
- */
- /*gst_curl_http_src_unref_multi (NULL, GSTCURL_RETURN_PIPELINE_NULL, TRUE); */
- GSTCURL_INFO_PRINT ("Got instruction to shut down");
- } else if (context->state == GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL) {
- qelement = context->queue;
- while (qelement != NULL) {
- qnext = qelement->next;
- if (qelement->p == context->request_removal_element) {
- g_mutex_lock (&qelement->p->buffer_mutex);
- curl_multi_remove_handle (context->multi_handle,
- context->request_removal_element->curl_handle);
- if (qelement->p->state == GSTCURL_UNLOCK) {
- qelement->p->pending_state = GSTCURL_REMOVED;
- } else {
- qelement->p->state = GSTCURL_REMOVED;
+ if (curl_message->easy_handle != NULL) {
+ curl_multi_remove_handle (context->multi_handle,
+ curl_message->easy_handle);
+ gst_curl_http_src_remove_queue_handle (&context->queue,
+ curl_message->easy_handle, curl_message->data.result);
}
- g_cond_signal (&qelement->p->signal);
- g_mutex_unlock (&qelement->p->buffer_mutex);
- gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
}
- qelement = qnext;
}
- context->request_removal_element = NULL;
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- g_mutex_unlock (&context->mutex);
- } else {
- GSTCURL_WARNING_PRINT ("Curl Loop State was invalid or unsupported");
- GSTCURL_WARNING_PRINT ("Signal State is %d, resetting to RUNNING.",
- context->state);
- /* Reset to running, so if there isn't anything to do it'll be
- * changed the WAIT once curl_multi_perform says it has no active
- * handles. */
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- g_mutex_unlock (&context->mutex);
}
+out:
+ g_mutex_unlock (&context->mutex);
}
/*
@@ -1763,8 +1843,8 @@ gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
/* If header field already exists, append to the end */
if (gst_structure_has_field (response_headers, header_key) == TRUE) {
header_value = g_strdup_printf ("%s, %s",
- g_value_get_string (gst_structure_get_value (response_headers,
- header_key)), header_tpl[1]);
+ gst_structure_get_string (response_headers, header_key),
+ header_tpl[1]);
gst_structure_set ((GstStructure *) response_headers, header_key,
G_TYPE_STRING, header_value, NULL);
g_free (header_value);
@@ -1850,7 +1930,7 @@ gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
}
memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
s->buffer_len += chunk_len;
- g_cond_signal (&s->signal);
+ g_cond_signal (&s->buffer_cond);
g_mutex_unlock (&s->buffer_mutex);
return chunk_len;
}
@@ -1864,10 +1944,29 @@ gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
- g_mutex_lock (&klass->multi_task_context.mutex);
- klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL;
- klass->multi_task_context.request_removal_element = src;
+ g_mutex_lock (&klass->multi_task_context.mutex);
+ g_mutex_lock (&src->buffer_mutex);
+ if (src->connection_status == GSTCURL_CONNECTED) {
+ src->connection_status = GSTCURL_WANT_REMOVAL;
+ }
+ g_mutex_unlock (&src->buffer_mutex);
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
}
+
+/*
+ * Request a cancellation of a currently running curl handle and
+ * block this thread until the src element has been removed
+ * from the queue
+ */
+static void
+gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
+{
+ gst_curl_http_src_request_remove (src);
+ g_mutex_lock (&src->buffer_mutex);
+ while (src->connection_status != GSTCURL_NOT_CONNECTED) {
+ g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
+ }
+ g_mutex_unlock (&src->buffer_mutex);
+}
diff --git a/ext/curl/gstcurlhttpsrc.h b/ext/curl/gstcurlhttpsrc.h
index 72af6dc3b..d7e65d2a7 100644
--- a/ext/curl/gstcurlhttpsrc.h
+++ b/ext/curl/gstcurlhttpsrc.h
@@ -110,18 +110,12 @@ struct _GstCurlHttpSrcMultiTaskContext
guint refcount;
GCond signal;
- GstCurlHttpSrc *request_removal_element;
-
GstCurlHttpSrcQueueElement *queue;
enum
{
- GSTCURL_MULTI_LOOP_STATE_WAIT = 0,
- GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT,
GSTCURL_MULTI_LOOP_STATE_RUNNING,
- GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL,
- GSTCURL_MULTI_LOOP_STATE_STOP,
- GSTCURL_MULTI_LOOP_STATE_MAX
+ GSTCURL_MULTI_LOOP_STATE_STOP
} state;
/* < private > */
@@ -200,11 +194,16 @@ struct _GstCurlHttpSrc
} state, pending_state;
CURL *curl_handle;
GMutex buffer_mutex;
- GCond signal;
+ GCond buffer_cond;
gchar *buffer;
guint buffer_len;
gboolean transfer_begun;
gboolean data_received;
+ enum {
+ GSTCURL_NOT_CONNECTED,
+ GSTCURL_CONNECTED,
+ GSTCURL_WANT_REMOVAL
+ } connection_status;
/*
* Response Headers
@@ -220,34 +219,6 @@ struct _GstCurlHttpSrc
GstCaps *caps;
};
-enum
-{
- PROP_0,
- PROP_URI,
- PROP_USERNAME,
- PROP_PASSWORD,
- PROP_PROXYURI,
- PROP_PROXYUSERNAME,
- PROP_PROXYPASSWORD,
- PROP_COOKIES,
- PROP_USERAGENT,
- PROP_HEADERS,
- PROP_COMPRESS,
- PROP_REDIRECT,
- PROP_MAXREDIRECT,
- PROP_KEEPALIVE,
- PROP_TIMEOUT,
- PROP_STRICT_SSL,
- PROP_SSL_CA_FILE,
- PROP_RETRIES,
- PROP_CONNECTIONMAXTIME,
- PROP_MAXCONCURRENT_SERVER,
- PROP_MAXCONCURRENT_PROXY,
- PROP_MAXCONCURRENT_GLOBAL,
- PROP_HTTPVERSION,
- PROP_MAX
-};
-
GType gst_curl_http_src_get_type (void);
G_END_DECLS
diff --git a/ext/curl/gstcurlqueue.c b/ext/curl/gstcurlqueue.c
index bfe6da957..627a2ba25 100644
--- a/ext/curl/gstcurlqueue.c
+++ b/ext/curl/gstcurlqueue.c
@@ -83,8 +83,9 @@ gst_curl_http_src_add_queue_item (GstCurlHttpSrcQueueElement ** queue,
}
insert_point->p = s;
- g_mutex_init (&insert_point->running);
+ g_atomic_int_set (&insert_point->running, 0);
insert_point->next = NULL;
+ s->connection_status = GSTCURL_CONNECTED;
return TRUE;
}
@@ -127,6 +128,7 @@ gst_curl_http_src_remove_queue_item (GstCurlHttpSrcQueueElement ** queue,
prev_qelement->next = this_qelement->next;
}
g_free (this_qelement);
+ s->connection_status = GSTCURL_NOT_CONNECTED;
return TRUE;
}
@@ -164,12 +166,13 @@ gst_curl_http_src_remove_queue_handle (GstCurlHttpSrcQueueElement ** queue,
this_qelement->p->uri); */
/* First, signal the transfer owner thread to wake up */
g_mutex_lock (&this_qelement->p->buffer_mutex);
- g_cond_signal (&this_qelement->p->signal);
+ g_cond_signal (&this_qelement->p->buffer_cond);
if (this_qelement->p->state != GSTCURL_UNLOCK) {
this_qelement->p->state = GSTCURL_DONE;
} else {
this_qelement->p->pending_state = GSTCURL_DONE;
}
+ this_qelement->p->connection_status = GSTCURL_NOT_CONNECTED;
this_qelement->p->curl_result = result;
g_mutex_unlock (&this_qelement->p->buffer_mutex);
diff --git a/ext/curl/gstcurlqueue.h b/ext/curl/gstcurlqueue.h
index 58eb340f4..e43359a58 100644
--- a/ext/curl/gstcurlqueue.h
+++ b/ext/curl/gstcurlqueue.h
@@ -51,7 +51,7 @@
struct _GstCurlHttpSrcQueueElement
{
GstCurlHttpSrc *p;
- GMutex running;
+ volatile gint running;
GstCurlHttpSrcQueueElement *next;
};