summaryrefslogtreecommitdiff
path: root/ext/soup
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2016-05-31 15:29:13 +0300
committerSebastian Dröge <sebastian@centricular.com>2016-06-03 13:25:38 +0300
commitfdac3a7a231f3848665636cf8122f96103b46e3b (patch)
tree496b741860370060f325f2f666fca072ceb93686 /ext/soup
parentcfaedbc2658537ea6f12256bf864a46c015e1b77 (diff)
WIP revert soup
Diffstat (limited to 'ext/soup')
-rw-r--r--ext/soup/Makefile.am4
-rw-r--r--ext/soup/gstsouphttpclientsink.c2
-rw-r--r--ext/soup/gstsouphttpsrc.c766
-rw-r--r--ext/soup/gstsouphttpsrc.h16
4 files changed, 484 insertions, 304 deletions
diff --git a/ext/soup/Makefile.am b/ext/soup/Makefile.am
index 63e5fec9e..c1bd73571 100644
--- a/ext/soup/Makefile.am
+++ b/ext/soup/Makefile.am
@@ -4,8 +4,8 @@ libgstsouphttpsrc_la_SOURCES = gstsouphttpsrc.c gstsouphttpclientsink.c gstsoupu
libgstsouphttpsrc_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) \
$(GST_CFLAGS) $(SOUP_CFLAGS) \
- -DSOUP_VERSION_MIN_REQUIRED=SOUP_VERSION_2_48 \
- -DSOUP_VERSION_MAX_ALLOWED=SOUP_DEPRECATED_IN_2_48
+ -DSOUP_VERSION_MIN_REQUIRED=SOUP_VERSION_2_26 \
+ -DSOUP_VERSION_MAX_ALLOWED=SOUP_DEPRECATED_IN_2_26
libgstsouphttpsrc_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) -lgsttag-@GST_API_VERSION@ $(GST_BASE_LIBS) $(SOUP_LIBS)
libgstsouphttpsrc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstsouphttpsrc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
diff --git a/ext/soup/gstsouphttpclientsink.c b/ext/soup/gstsouphttpclientsink.c
index c81248874..84be5e8c3 100644
--- a/ext/soup/gstsouphttpclientsink.c
+++ b/ext/soup/gstsouphttpclientsink.c
@@ -264,10 +264,8 @@ gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
g_list_free_full (souphttpsink->streamheader_buffers,
(GDestroyNotify) gst_buffer_unref);
- souphttpsink->streamheader_buffers = NULL;
g_list_free_full (souphttpsink->sent_buffers,
(GDestroyNotify) gst_buffer_unref);
- souphttpsink->sent_buffers = NULL;
}
static gboolean
diff --git a/ext/soup/gstsouphttpsrc.c b/ext/soup/gstsouphttpsrc.c
index 0bd51a147..f1475ae39 100644
--- a/ext/soup/gstsouphttpsrc.c
+++ b/ext/soup/gstsouphttpsrc.c
@@ -77,11 +77,19 @@
#endif
#include <gst/gstelement.h>
#include <gst/gst-i18n-plugin.h>
-#include <gio/gio.h>
#include <libsoup/soup.h>
#include "gstsouphttpsrc.h"
#include "gstsouputils.h"
+/* libsoup before 2.47.0 was stealing our main context from us,
+ * so we can't reliable use it to clean up all pending resources
+ * once we're done... let's just continue leaking on old versions.
+ * https://bugzilla.gnome.org/show_bug.cgi?id=663944
+ */
+#if defined(SOUP_MINOR_VERSION) && SOUP_MINOR_VERSION >= 47
+#define LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT 1
+#endif
+
#include <gst/tag/tag.h>
GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug);
@@ -165,18 +173,31 @@ static char *gst_soup_http_src_unicodify (const char *str);
static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src,
const gchar * method);
static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src);
+static void gst_soup_http_src_queue_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src,
guint64 offset, guint64 stop_offset);
+static void gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src);
+static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
static void gst_soup_http_src_parse_status (SoupMessage * msg,
GstSoupHTTPSrc * src);
-static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
- SoupMessage * msg);
+static void gst_soup_http_src_chunk_free (gpointer gstbuf);
+static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
+ gsize max_len, gpointer user_data);
+static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
+ SoupBuffer * chunk, GstSoupHTTPSrc * src);
+static void gst_soup_http_src_response_cb (SoupSession * session,
+ SoupMessage * msg, GstSoupHTTPSrc * src);
+static void gst_soup_http_src_got_headers_cb (SoupMessage * msg,
+ GstSoupHTTPSrc * src);
+static void gst_soup_http_src_got_body_cb (SoupMessage * msg,
+ GstSoupHTTPSrc * src);
+static void gst_soup_http_src_finished_cb (SoupMessage * msg,
+ GstSoupHTTPSrc * src);
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
GstSoupHTTPSrc * src);
-static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
#define gst_soup_http_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
@@ -429,6 +450,8 @@ gst_soup_http_src_class_init (GstSoupHTTPSrcClass * klass)
static void
gst_soup_http_src_reset (GstSoupHTTPSrc * src)
{
+ src->interrupted = FALSE;
+ src->retry = FALSE;
src->retry_count = 0;
src->have_size = FALSE;
src->got_headers = FALSE;
@@ -440,8 +463,6 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
src->have_body = FALSE;
src->ret = GST_FLOW_OK;
- g_cancellable_reset (src->cancellable);
- gst_soup_http_src_destroy_input_stream (src);
gst_caps_replace (&src->src_caps, NULL);
g_free (src->iradio_name);
@@ -458,9 +479,7 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
const gchar *proxy;
g_mutex_init (&src->mutex);
- g_cond_init (&src->have_headers_cond);
- src->cancellable = g_cancellable_new ();
- src->poll_context = g_main_context_new ();
+ g_cond_init (&src->request_finished_cond);
src->location = NULL;
src->redirection_uri = NULL;
src->automatic_redirect = TRUE;
@@ -471,6 +490,8 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
src->proxy_pw = NULL;
src->cookies = NULL;
src->iradio_mode = DEFAULT_IRADIO_MODE;
+ src->loop = NULL;
+ src->context = NULL;
src->session = NULL;
src->msg = NULL;
src->timeout = DEFAULT_TIMEOUT;
@@ -513,9 +534,7 @@ gst_soup_http_src_finalize (GObject * gobject)
GST_DEBUG_OBJECT (src, "finalize");
g_mutex_clear (&src->mutex);
- g_cond_clear (&src->have_headers_cond);
- g_object_unref (src->cancellable);
- g_main_context_unref (src->poll_context);
+ g_cond_clear (&src->request_finished_cond);
g_free (src->location);
g_free (src->redirection_uri);
g_free (src->user_agent);
@@ -766,25 +785,23 @@ gst_soup_http_src_unicodify (const gchar * str)
}
static void
-gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
+gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
- if (src->input_stream) {
- if (src->poll_source) {
- g_source_destroy (src->poll_source);
- g_source_unref (src->poll_source);
- src->poll_source = NULL;
- }
- g_input_stream_close (src->input_stream, src->cancellable, NULL);
- g_object_unref (src->input_stream);
- src->input_stream = NULL;
+ if (src->msg != NULL) {
+ GST_INFO_OBJECT (src, "Cancelling message");
+ src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED;
+ soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
}
+ src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
+ src->msg = NULL;
}
static void
-gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
+gst_soup_http_src_queue_message (GstSoupHTTPSrc * src)
{
- g_cancellable_cancel (src->cancellable);
- g_cond_signal (&src->have_headers_cond);
+ soup_session_queue_message (src->session, src->msg,
+ (SoupSessionCallback) gst_soup_http_src_response_cb, src);
+ src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED;
}
static gboolean
@@ -889,6 +906,19 @@ gst_soup_http_src_add_extra_headers (GstSoupHTTPSrc * src)
return gst_structure_foreach (src->extra_headers, _append_extra_headers, src);
}
+
+static void
+gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src)
+{
+ soup_session_unpause_message (src->session, src->msg);
+}
+
+static void
+gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src)
+{
+ soup_session_pause_message (src->session, src->msg);
+}
+
static gboolean
gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
{
@@ -903,17 +933,32 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
return FALSE;
}
+ if (!src->context)
+ src->context = g_main_context_new ();
+
+ if (!src->loop)
+ src->loop = g_main_loop_new (src->context, TRUE);
+ if (!src->loop) {
+ GST_ELEMENT_ERROR (src, LIBRARY, INIT,
+ (NULL), ("Failed to start GMainLoop"));
+ g_main_context_unref (src->context);
+ return FALSE;
+ }
+
if (!src->session) {
GST_DEBUG_OBJECT (src, "Creating session");
if (src->proxy == NULL) {
src->session =
- soup_session_new_with_options (SOUP_SESSION_USER_AGENT,
- src->user_agent, SOUP_SESSION_TIMEOUT, src->timeout,
+ soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
+ src->context, SOUP_SESSION_USER_AGENT, src->user_agent,
+ SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
+ SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
SOUP_SESSION_TLS_INTERACTION, src->tls_interaction, NULL);
} else {
src->session =
- soup_session_new_with_options (SOUP_SESSION_PROXY_URI, src->proxy,
+ soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
+ src->context, SOUP_SESSION_PROXY_URI, src->proxy,
SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
SOUP_SESSION_USER_AGENT, src->user_agent,
@@ -951,11 +996,22 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
return TRUE;
}
+#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
+static gboolean
+dummy_idle_cb (gpointer data)
+{
+ return FALSE /* Idle source is removed */ ;
+}
+#endif
+
static void
gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
{
GST_DEBUG_OBJECT (src, "Closing session");
+ if (src->loop)
+ g_main_loop_quit (src->loop);
+
g_mutex_lock (&src->mutex);
if (src->session) {
soup_session_abort (src->session); /* This unrefs the message. */
@@ -963,6 +1019,33 @@ gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
src->session = NULL;
src->msg = NULL;
}
+ if (src->loop) {
+#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
+ GSource *idle_source;
+
+ /* Iterating the main context to give GIO cancellables a chance
+ * to initiate cleanups. Wihout this, resources allocated by
+ * libsoup for the connection are not released and socket fd is
+ * leaked. */
+ idle_source = g_idle_source_new ();
+ /* Suppressing "idle souce without callback" warning */
+ g_source_set_callback (idle_source, dummy_idle_cb, NULL, NULL);
+ g_source_set_priority (idle_source, G_PRIORITY_LOW);
+ g_source_attach (idle_source, src->context);
+ /* Acquiring the context. Idle source guarantees that we'll not block. */
+ g_main_context_push_thread_default (src->context);
+ g_main_context_iteration (src->context, TRUE);
+ /* Ensuring that there's no unhandled pending events left. */
+ while (g_main_context_iteration (src->context, FALSE));
+ g_main_context_pop_thread_default (src->context);
+ g_source_unref (idle_source);
+#endif
+
+ g_main_loop_unref (src->loop);
+ g_main_context_unref (src->context);
+ src->loop = NULL;
+ src->context = NULL;
+ }
g_mutex_unlock (&src->mutex);
}
@@ -1017,7 +1100,7 @@ insert_http_header (const gchar * name, const gchar * value, gpointer user_data)
}
static void
-gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
+gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
{
const char *value;
GstTagList *tag_list;
@@ -1044,14 +1127,11 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
return;
}
- if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
- /* force an error */
- gst_soup_http_src_parse_status (msg, src);
+ if (msg->status_code == SOUP_STATUS_UNAUTHORIZED)
return;
- }
+ src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING;
src->got_headers = TRUE;
- g_cond_broadcast (&src->have_headers_cond);
http_headers = gst_structure_new_empty ("http-headers");
gst_structure_set (http_headers, "uri", G_TYPE_STRING, src->location, NULL);
@@ -1143,7 +1223,7 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
if (param != NULL)
rate = atol (param);
- src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
+ src->src_caps = gst_caps_new_simple ("audio/x-raw",
"format", G_TYPE_STRING, "S16BE",
"layout", G_TYPE_STRING, "interleaved",
"channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
@@ -1219,23 +1299,251 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
* GST_FLOW_ERROR from the create function instead of having
* got_chunk_cb overwrite src->ret with FLOW_OK again. */
if (src->ret == GST_FLOW_ERROR || src->ret == GST_FLOW_EOS) {
+ gst_soup_http_src_session_pause_message (src);
+
+ if (src->loop)
+ g_main_loop_quit (src->loop);
+ }
+ g_cond_signal (&src->request_finished_cond);
+}
+
+/* Have body. Signal EOS. */
+static void
+gst_soup_http_src_got_body_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
+{
+ if (G_UNLIKELY (msg != src->msg)) {
+ GST_DEBUG_OBJECT (src, "got body, but not for current message");
+ return;
+ }
+ if (G_UNLIKELY (src->session_io_status !=
+ GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
+ /* Probably a redirect. */
+ return;
}
+ GST_DEBUG_OBJECT (src, "got body");
+ src->ret = GST_FLOW_EOS;
+ src->have_body = TRUE;
+
+ /* no need to interrupt the message here, we do it on the
+ * finished_cb anyway if needed. And getting the body might mean
+ * that the connection was hang up before finished. This happens when
+ * the pipeline is stalled for too long (long pauses during playback).
+ * Best to let it continue from here and pause because it reached the
+ * final bytes based on content_size or received an out of range error */
+}
+
+/* Finished. Signal EOS. */
+static void
+gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
+{
+ if (G_UNLIKELY (msg != src->msg)) {
+ GST_DEBUG_OBJECT (src, "finished, but not for current message");
+ return;
+ }
+ GST_INFO_OBJECT (src, "finished, io status: %d", src->session_io_status);
+ src->ret = GST_FLOW_EOS;
+ if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED) {
+ /* gst_soup_http_src_cancel_message() triggered this; probably a seek
+ * that occurred in the QUEUEING state; i.e. before the connection setup
+ * was complete. Do nothing */
+ GST_DEBUG_OBJECT (src, "cancelled");
+ } else if (src->session_io_status ==
+ GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING && src->read_position > 0 &&
+ (src->have_size && src->read_position < src->content_size) &&
+ (src->max_retries == -1 || src->retry_count < src->max_retries)) {
+ /* The server disconnected while streaming. Reconnect and seeking to the
+ * last location. */
+ src->retry = TRUE;
+ src->retry_count++;
+ src->ret = GST_FLOW_CUSTOM_ERROR;
+ } else if (G_UNLIKELY (src->session_io_status !=
+ GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
+ if (msg->method == SOUP_METHOD_HEAD) {
+ GST_DEBUG_OBJECT (src, "Ignoring error %d:%s during HEAD request",
+ msg->status_code, msg->reason_phrase);
+ } else {
+ gst_soup_http_src_parse_status (msg, src);
+ }
+ }
+ if (src->loop)
+ g_main_loop_quit (src->loop);
+ g_cond_signal (&src->request_finished_cond);
+}
+
+/* Buffer lifecycle management.
+ *
+ * gst_soup_http_src_create() runs the GMainLoop for this element, to let
+ * Soup take control.
+ * A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
+ * associated with a SoupBuffer.
+ * Soup reads HTTP data in the GstBuffer's data buffer.
+ * The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
+ * That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
+ * increments its refcount (to 2), pauses the flow of data from the HTTP
+ * source to prevent gst_soup_http_src_got_chunk_cb() from being called
+ * again and breaks out of the GMainLoop.
+ * Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
+ * SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
+ * refcount (to 1).
+ * gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
+ * downstream element.
+ * If Soup fails to read HTTP data, it does not call
+ * gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
+ * calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
+ * refcount to 0, freeing it.
+ */
+
+typedef struct
+{
+ GstBuffer *buffer;
+ GstMapInfo map;
+} SoupGstChunk;
+
+static void
+gst_soup_http_src_chunk_free (gpointer user_data)
+{
+ SoupGstChunk *chunk = (SoupGstChunk *) user_data;
+
+ gst_buffer_unmap (chunk->buffer, &chunk->map);
+ gst_buffer_unref (chunk->buffer);
+ g_slice_free (SoupGstChunk, chunk);
}
-static GstBuffer *
-gst_soup_http_src_alloc_buffer (GstSoupHTTPSrc * src)
+static SoupBuffer *
+gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
+ gpointer user_data)
{
+ GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
- GstFlowReturn rc;
GstBuffer *gstbuf;
+ SoupBuffer *soupbuf;
+ gsize length;
+ GstFlowReturn rc;
+ SoupGstChunk *chunk;
- rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1,
- basesrc->blocksize, &gstbuf);
+ if (max_len)
+ length = MIN (basesrc->blocksize, max_len);
+ else
+ length = basesrc->blocksize;
+ GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
+ length, max_len);
+
+ rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1, length, &gstbuf);
if (G_UNLIKELY (rc != GST_FLOW_OK)) {
+ /* Failed to allocate buffer. Stall SoupSession and return error code
+ * to create(). */
+ src->ret = rc;
+ g_main_loop_quit (src->loop);
return NULL;
}
- return gstbuf;
+ chunk = g_slice_new0 (SoupGstChunk);
+ chunk->buffer = gstbuf;
+ gst_buffer_map (gstbuf, &chunk->map, GST_MAP_READWRITE);
+
+ soupbuf = soup_buffer_new_with_owner (chunk->map.data, chunk->map.size,
+ chunk, gst_soup_http_src_chunk_free);
+
+ return soupbuf;
+}
+
+static void
+gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
+ GstSoupHTTPSrc * src)
+{
+ GstBaseSrc *basesrc;
+ guint64 new_position;
+ SoupGstChunk *gchunk;
+
+ if (G_UNLIKELY (msg != src->msg)) {
+ GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
+ return;
+ }
+ if (G_UNLIKELY (!src->outbuf)) {
+ GST_DEBUG_OBJECT (src, "got chunk but we're not expecting one");
+ src->ret = GST_FLOW_OK;
+ gst_soup_http_src_cancel_message (src);
+ g_main_loop_quit (src->loop);
+ return;
+ }
+
+ /* We got data, reset the retry counter */
+ src->retry_count = 0;
+
+ src->have_body = FALSE;
+ if (G_UNLIKELY (src->session_io_status !=
+ GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
+ /* Probably a redirect. */
+ return;
+ }
+ basesrc = GST_BASE_SRC_CAST (src);
+ GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
+ chunk->length);
+
+ /* Extract the GstBuffer from the SoupBuffer and set its fields. */
+ gchunk = (SoupGstChunk *) soup_buffer_get_owner (chunk);
+ *src->outbuf = gchunk->buffer;
+
+ gst_buffer_resize (*src->outbuf, 0, chunk->length);
+ GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.position;
+
+ gst_buffer_ref (*src->outbuf);
+
+ new_position = src->read_position + chunk->length;
+ if (G_LIKELY (src->request_position == src->read_position))
+ src->request_position = new_position;
+ src->read_position = new_position;
+
+ if (src->have_size) {
+ if (new_position > src->content_size) {
+ GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
+ "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
+ src->content_size);
+ src->content_size = new_position;
+ basesrc->segment.duration = src->content_size;
+ gst_element_post_message (GST_ELEMENT (src),
+ gst_message_new_duration_changed (GST_OBJECT (src)));
+ } else if (new_position == src->content_size) {
+ GST_DEBUG_OBJECT (src, "We're EOS now");
+ }
+ }
+
+ src->ret = GST_FLOW_OK;
+ g_main_loop_quit (src->loop);
+ gst_soup_http_src_session_pause_message (src);
+}
+
+static void
+gst_soup_http_src_response_cb (SoupSession * session, SoupMessage * msg,
+ GstSoupHTTPSrc * src)
+{
+ if (G_UNLIKELY (msg != src->msg)) {
+ GST_DEBUG_OBJECT (src, "got response %d: %s, but not for current message",
+ msg->status_code, msg->reason_phrase);
+ return;
+ }
+ if (G_UNLIKELY (src->session_io_status !=
+ GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)
+ && SOUP_STATUS_IS_REDIRECTION (msg->status_code)) {
+ /* Ignore redirections. */
+ return;
+ }
+ GST_INFO_OBJECT (src, "got response %d: %s", msg->status_code,
+ msg->reason_phrase);
+ if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING &&
+ src->read_position > 0 && (src->have_size
+ && src->read_position < src->content_size) &&
+ (src->max_retries == -1 || src->retry_count < src->max_retries)) {
+ /* The server disconnected while streaming. Reconnect and seeking to the
+ * last location. */
+ src->retry = TRUE;
+ src->retry_count++;
+ } else {
+ gst_soup_http_src_parse_status (msg, src);
+ }
+ /* The session's SoupMessage object expires after this callback returns. */
+ src->msg = NULL;
+ g_main_loop_quit (src->loop);
}
#define SOUP_HTTP_SRC_ERROR(src,soup_msg,cat,code,error_message) \
@@ -1271,6 +1579,8 @@ gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
break;
case SOUP_STATUS_IO_ERROR:
if (src->max_retries == -1 || src->retry_count < src->max_retries) {
+ src->retry = TRUE;
+ src->retry_count++;
src->ret = GST_FLOW_CUSTOM_ERROR;
} else {
SOUP_HTTP_SRC_ERROR (src, msg, RESOURCE, READ,
@@ -1343,6 +1653,7 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
("Error parsing URL."), ("URL: %s", src->location));
return FALSE;
}
+ src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
if (!src->keep_alive) {
soup_message_headers_append (src->msg->request_headers, "Connection",
"close");
@@ -1359,9 +1670,20 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
*cookie);
}
}
+ src->retry = FALSE;
+ g_signal_connect (src->msg, "got_headers",
+ G_CALLBACK (gst_soup_http_src_got_headers_cb), src);
+ g_signal_connect (src->msg, "got_body",
+ G_CALLBACK (gst_soup_http_src_got_body_cb), src);
+ g_signal_connect (src->msg, "finished",
+ G_CALLBACK (gst_soup_http_src_finished_cb), src);
+ g_signal_connect (src->msg, "got_chunk",
+ G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
(src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
+ soup_message_set_chunk_allocator (src->msg,
+ gst_soup_http_src_chunk_allocator, src, NULL);
gst_soup_http_src_add_range_header (src, src->request_position,
src->stop_position);
@@ -1370,291 +1692,146 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
return TRUE;
}
-static void
-gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
-{
- if (!src->input_stream)
- return;
-
- src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
- && g_pollable_input_stream_can_poll ((GPollableInputStream *)
- src->input_stream);
-}
-
static GstFlowReturn
-gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
+gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method,
+ GstBuffer ** outbuf)
{
- g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
-
- g_assert (src->input_stream == NULL);
- g_assert (src->poll_source == NULL);
-
- /* FIXME We are ignoring the GError here, might be useful to debug */
- src->input_stream =
- soup_session_send (src->session, src->msg, src->cancellable, NULL);
-
- if (g_cancellable_is_cancelled (src->cancellable))
- return GST_FLOW_FLUSHING;
-
- gst_soup_http_src_got_headers (src, src->msg);
+ /* If we're not OK, just go out of here */
if (src->ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (src, "Previous flow return not OK: %s",
+ gst_flow_get_name (src->ret));
return src->ret;
}
- if (!src->input_stream) {
- GST_DEBUG_OBJECT (src, "Didn't get an input stream");
- return GST_FLOW_ERROR;
- }
-
- if (SOUP_STATUS_IS_SUCCESSFUL (src->msg->status_code)) {
- GST_DEBUG_OBJECT (src, "Successfully got a reply");
- } else {
- /* FIXME - be more helpful to people debugging */
- return GST_FLOW_ERROR;
- }
-
- gst_soup_http_src_check_input_stream_interfaces (src);
-
- return GST_FLOW_OK;
-}
-
-static GstFlowReturn
-gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method)
-{
- if (src->max_retries != -1 && src->retry_count > src->max_retries) {
- GST_DEBUG_OBJECT (src, "Max retries reached");
- src->ret = GST_FLOW_ERROR;
- return src->ret;
- }
-
- src->retry_count++;
- /* EOS immediately if we have an empty segment */
- if (src->request_position == src->stop_position)
- return GST_FLOW_EOS;
-
GST_LOG_OBJECT (src, "Running request for method: %s", method);
-
- /* Update the position if we are retrying */
if (src->msg && (src->request_position != src->read_position)) {
- gst_soup_http_src_add_range_header (src, src->request_position,
- src->stop_position);
- }
+ if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
+ /* EOS immediately if we have an empty segment */
+ if (src->request_position == src->stop_position)
+ return GST_FLOW_EOS;
- if (!src->msg) {
- if (!gst_soup_http_src_build_message (src, method)) {
- return GST_FLOW_ERROR;
+ gst_soup_http_src_add_range_header (src, src->request_position,
+ src->stop_position);
+ } else {
+ GST_DEBUG_OBJECT (src, "Seek from position %" G_GUINT64_FORMAT
+ " to %" G_GUINT64_FORMAT ": requeueing connection request",
+ src->read_position, src->request_position);
+ gst_soup_http_src_cancel_message (src);
}
}
+ if (!src->msg) {
+ /* EOS immediately if we have an empty segment */
+ if (src->request_position == src->stop_position)
+ return GST_FLOW_EOS;
- if (g_cancellable_is_cancelled (src->cancellable)) {
- GST_INFO_OBJECT (src, "interrupted");
- src->ret = GST_FLOW_FLUSHING;
- goto done;
+ if (!gst_soup_http_src_build_message (src, method))
+ return GST_FLOW_ERROR;
}
- src->ret = gst_soup_http_src_send_message (src);
-done:
- return src->ret;
-}
-
-static void
-gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
-{
- GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
- guint64 new_position;
-
- new_position = src->read_position + bytes_read;
- if (G_LIKELY (src->request_position == src->read_position))
- src->request_position = new_position;
- src->read_position = new_position;
-
- if (src->have_size) {
- if (new_position > src->content_size) {
- GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
- "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
- src->content_size);
- src->content_size = new_position;
- basesrc->segment.duration = src->content_size;
- gst_element_post_message (GST_ELEMENT (src),
- gst_message_new_duration_changed (GST_OBJECT (src)));
- } else if (new_position == src->content_size) {
- GST_DEBUG_OBJECT (src, "We're EOS now");
+ src->ret = GST_FLOW_CUSTOM_ERROR;
+ src->outbuf = outbuf;
+ do {
+ if (src->interrupted) {
+ GST_INFO_OBJECT (src, "interrupted");
+ src->ret = GST_FLOW_FLUSHING;
+ break;
}
- }
-}
+ if (src->retry) {
+ GST_INFO_OBJECT (src, "Reconnecting");
-static gboolean
-_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
- gpointer udata)
-{
- GstSoupHTTPSrc *src = udata;
+ /* EOS immediately if we have an empty segment */
+ if (src->request_position == src->stop_position)
+ return GST_FLOW_EOS;
- src->have_data = TRUE;
- return TRUE;
-}
-
-/* Need to wait on a gsource to know when data is available */
-static gboolean
-gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
-{
- src->have_data = FALSE;
-
- if (!src->poll_source) {
- src->poll_source =
- g_pollable_input_stream_create_source ((GPollableInputStream *)
- src->input_stream, src->cancellable);
- g_source_set_callback (src->poll_source,
- (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
- g_source_attach (src->poll_source, src->poll_context);
- }
-
- while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
- g_main_context_iteration (src->poll_context, TRUE);
- }
-
- return src->have_data;
-}
-
-static GstFlowReturn
-gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
-{
- gssize read_bytes;
- GstMapInfo mapinfo;
- GstBaseSrc *bsrc;
- GstFlowReturn ret;
- GError *err = NULL;
-
- bsrc = GST_BASE_SRC_CAST (src);
-
- *outbuf = gst_soup_http_src_alloc_buffer (src);
- if (!*outbuf) {
- GST_WARNING_OBJECT (src, "Failed to allocate buffer");
- return GST_FLOW_ERROR;
- }
-
- if (!gst_buffer_map (*outbuf, &mapinfo, GST_MAP_WRITE)) {
- GST_WARNING_OBJECT (src, "Failed to map buffer");
- return GST_FLOW_ERROR;
- }
+ if (!gst_soup_http_src_build_message (src, method))
+ return GST_FLOW_ERROR;
+ src->retry = FALSE;
+ continue;
+ }
+ if (!src->msg) {
+ GST_DEBUG_OBJECT (src, "EOS reached");
+ break;
+ }
- if (src->has_pollable_interface) {
- while (1) {
- read_bytes =
- g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
- src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
- &err);
- if (read_bytes == -1) {
- if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_error_free (err);
- err = NULL;
+ switch (src->session_io_status) {
+ case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE:
+ GST_INFO_OBJECT (src, "Queueing connection request");
+ gst_soup_http_src_queue_message (src);
+ break;
+ case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
+ break;
+ case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
+ gst_soup_http_src_session_unpause_message (src);
+ break;
+ case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED:
+ /* Impossible. */
+ break;
+ }
- /* no data yet, wait */
- if (gst_soup_http_src_wait_for_data (src))
- /* retry */
- continue;
- }
- }
- break;
+ if (src->ret == GST_FLOW_CUSTOM_ERROR) {
+ g_main_context_push_thread_default (src->context);
+ g_main_loop_run (src->loop);
+ g_main_context_pop_thread_default (src->context);
}
- } else {
- read_bytes =
- g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
- src->cancellable, NULL);
- }
- if (err)
- g_error_free (err);
+ } while (src->ret == GST_FLOW_CUSTOM_ERROR);
- GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
- read_bytes);
+ /* Let the request finish if we had a stop position and are there */
+ if (src->ret == GST_FLOW_OK && src->stop_position != -1
+ && src->read_position >= src->stop_position) {
+ src->outbuf = NULL;
+ gst_soup_http_src_session_unpause_message (src);
+ g_main_context_push_thread_default (src->context);
+ g_main_loop_run (src->loop);
+ g_main_context_pop_thread_default (src->context);
- g_mutex_lock (&src->mutex);
- if (g_cancellable_is_cancelled (src->cancellable)) {
- gst_buffer_unmap (*outbuf, &mapinfo);
- gst_buffer_unref (*outbuf);
- g_mutex_unlock (&src->mutex);
- return GST_FLOW_FLUSHING;
+ g_cond_signal (&src->request_finished_cond);
+ /* Return OK unconditionally here, src->ret will
+ * be most likely be EOS now but we want to
+ * consume the buffer we got above */
+ return GST_FLOW_OK;
}
- g_mutex_unlock (&src->mutex);
- gst_buffer_unmap (*outbuf, &mapinfo);
- if (read_bytes > 0) {
- gst_buffer_set_size (*outbuf, read_bytes);
- GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position;
- ret = GST_FLOW_OK;
- gst_soup_http_src_update_position (src, read_bytes);
+ if (src->ret == GST_FLOW_CUSTOM_ERROR)
+ src->ret = GST_FLOW_EOS;
+ g_cond_signal (&src->request_finished_cond);
- /* Got some data, reset retry counter */
- src->retry_count = 0;
- } else {
+ /* basesrc assumes that we don't return a buffer if
+ * something else than OK is returned. It will just
+ * leak any buffer we might accidentially provide
+ * here.
+ *
+ * This can potentially happen during flushing.
+ */
+ if (src->ret != GST_FLOW_OK && outbuf && *outbuf) {
gst_buffer_unref (*outbuf);
- if (read_bytes < 0) {
- /* Maybe the server disconnected, retry */
- ret = GST_FLOW_CUSTOM_ERROR;
- } else {
- ret = GST_FLOW_EOS;
- src->have_body = TRUE;
- }
+ *outbuf = NULL;
}
- return ret;
+
+ return src->ret;
}
static GstFlowReturn
gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstSoupHTTPSrc *src;
- GstFlowReturn ret = GST_FLOW_OK;
- GstEvent *http_headers_event = NULL;
+ GstFlowReturn ret;
+ GstEvent *http_headers_event;
src = GST_SOUP_HTTP_SRC (psrc);
-retry:
g_mutex_lock (&src->mutex);
-
- /* Check for pending position change */
- if (src->request_position != src->read_position) {
- gst_soup_http_src_destroy_input_stream (src);
- }
-
- if (g_cancellable_is_cancelled (src->cancellable)) {
- ret = GST_FLOW_FLUSHING;
- g_mutex_unlock (&src->mutex);
- goto done;
- }
-
- /* If we have no open connection to the server, start one */
- if (!src->input_stream) {
- *outbuf = NULL;
- ret =
- gst_soup_http_src_do_request (src,
- src->method ? src->method : SOUP_METHOD_GET);
- http_headers_event = src->http_headers_event;
- src->http_headers_event = NULL;
- }
+ *outbuf = NULL;
+ ret =
+ gst_soup_http_src_do_request (src,
+ src->method ? src->method : SOUP_METHOD_GET, outbuf);
+ http_headers_event = src->http_headers_event;
+ src->http_headers_event = NULL;
g_mutex_unlock (&src->mutex);
- if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) {
- if (http_headers_event) {
- gst_pad_push_event (GST_BASE_SRC_PAD (src), http_headers_event);
- http_headers_event = NULL;
- }
- }
+ if (http_headers_event)
+ gst_pad_push_event (GST_BASE_SRC_PAD (src), http_headers_event);
- if (ret == GST_FLOW_OK)
- ret = gst_soup_http_src_read_buffer (src, outbuf);
-
-done:
- GST_DEBUG_OBJECT (src, "Returning %d %s", ret, gst_flow_get_name (ret));
- if (ret != GST_FLOW_OK) {
- if (http_headers_event)
- gst_event_unref (http_headers_event);
-
- g_mutex_lock (&src->mutex);
- gst_soup_http_src_destroy_input_stream (src);
- g_mutex_unlock (&src->mutex);
- if (ret == GST_FLOW_CUSTOM_ERROR)
- goto retry;
- }
return ret;
}
@@ -1714,8 +1891,11 @@ gst_soup_http_src_unlock (GstBaseSrc * bsrc)
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock()");
+ src->interrupted = TRUE;
src->ret = GST_FLOW_FLUSHING;
- gst_soup_http_src_cancel_message (src);
+ if (src->loop)
+ g_main_loop_quit (src->loop);
+ g_cond_signal (&src->request_finished_cond);
return TRUE;
}
@@ -1728,8 +1908,8 @@ gst_soup_http_src_unlock_stop (GstBaseSrc * bsrc)
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock_stop()");
+ src->interrupted = FALSE;
src->ret = GST_FLOW_OK;
- g_cancellable_reset (src->cancellable);
return TRUE;
}
@@ -1761,14 +1941,14 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src)
*/
if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) {
g_mutex_lock (&src->mutex);
- while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable)
- && ret == GST_FLOW_OK) {
- if ((src->msg && src->msg->method != SOUP_METHOD_HEAD)) {
+ while (!src->got_headers && !src->interrupted && ret == GST_FLOW_OK) {
+ if ((src->msg && src->msg->method != SOUP_METHOD_HEAD) &&
+ src->session_io_status != GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
/* wait for the current request to finish */
- g_cond_wait (&src->have_headers_cond, &src->mutex);
+ g_cond_wait (&src->request_finished_cond, &src->mutex);
} else {
if (gst_soup_http_src_session_open (src)) {
- ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD);
+ ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD, NULL);
}
}
}
@@ -1776,6 +1956,8 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src)
/* A HEAD request shouldn't lead to EOS */
src->ret = GST_FLOW_OK;
}
+ /* resets status to idle */
+ gst_soup_http_src_cancel_message (src);
g_mutex_unlock (&src->mutex);
}
}
diff --git a/ext/soup/gstsouphttpsrc.h b/ext/soup/gstsouphttpsrc.h
index a5e259b1c..71725817e 100644
--- a/ext/soup/gstsouphttpsrc.h
+++ b/ext/soup/gstsouphttpsrc.h
@@ -59,9 +59,16 @@ struct _GstSoupHTTPSrc {
gchar *proxy_id; /* Authentication user id for proxy URI. */
gchar *proxy_pw; /* Authentication user password for proxy URI. */
gchar **cookies; /* HTTP request cookies. */
+ GMainContext *context; /* I/O context. */
+ GMainLoop *loop; /* Event loop. */
SoupSession *session; /* Async context. */
+ GstSoupHTTPSrcSessionIOStatus session_io_status;
+ /* Async I/O status. */
SoupMessage *msg; /* Request message. */
GstFlowReturn ret; /* Return code from callback. */
+ GstBuffer **outbuf; /* Return buffer allocated by callback. */
+ gboolean interrupted; /* Signal unlock(). */
+ gboolean retry; /* Should attempt to reconnect. */
gint retry_count; /* Number of retries since we received data */
gint max_retries; /* Maximum number of retries */
gchar *method; /* HTTP method */
@@ -87,13 +94,6 @@ struct _GstSoupHTTPSrc {
GTlsDatabase *tls_database;
GTlsInteraction *tls_interaction;
- GCancellable *cancellable;
- GInputStream *input_stream;
- gboolean has_pollable_interface;
- gboolean have_data;
- GMainContext *poll_context;
- GSource *poll_source;
-
/* Shoutcast/icecast metadata extraction handling. */
gboolean iradio_mode;
GstCaps *src_caps;
@@ -110,7 +110,7 @@ struct _GstSoupHTTPSrc {
guint timeout;
GMutex mutex;
- GCond have_headers_cond;
+ GCond request_finished_cond;
GstEvent *http_headers_event;
};