summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-09-14 22:31:12 -0700
committerDavid Schleef <ds@schleef.org>2014-09-14 22:33:48 -0700
commit5ac2cd235320b8fbe7a06a677701ab8a154bc363 (patch)
tree36aec1f3cdd6f581b52d38304bb5307edfe75f99
parent9ae7b379fa98bd693370d7b6c746a92016529e70 (diff)
connection: fix output queue threading problems
-rw-r--r--rtmp/rtmpconnection.c47
-rw-r--r--rtmp/rtmpconnection.h3
2 files changed, 31 insertions, 19 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 749d657..4ad0d54 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -127,7 +127,7 @@ static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
- rtmpconnection->output_queue = g_queue_new ();
+ rtmpconnection->output_queue = g_async_queue_new ();
rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new ();
rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new ();
@@ -182,6 +182,7 @@ gst_rtmp_connection_finalize (GObject * object)
{
GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
GSocket *sock;
+ gpointer p;
GST_DEBUG_OBJECT (rtmpconnection, "finalize");
@@ -205,7 +206,12 @@ gst_rtmp_connection_finalize (GObject * object)
g_object_unref (sock);
}
- g_queue_free_full (rtmpconnection->output_queue, g_object_unref);
+ p = g_async_queue_try_pop (rtmpconnection->output_queue);
+ while (p) {
+ g_object_unref (p);
+ p = g_async_queue_try_pop (rtmpconnection->output_queue);
+ }
+ g_async_queue_unref (rtmpconnection->output_queue);
gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache);
gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache);
@@ -228,7 +234,7 @@ gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
{
GInputStream *is;
- sc->thread = g_thread_self();
+ sc->thread = g_thread_self ();
sc->connection = connection;
/* refs the socket because it's creating an input stream, which holds a ref */
@@ -245,8 +251,8 @@ gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
void
gst_rtmp_connection_close (GstRtmpConnection * connection)
{
- if (connection->thread != g_thread_self()) {
- GST_ERROR("Called from wrong thread");
+ if (connection->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
}
g_cancellable_cancel (connection->cancellable);
@@ -272,13 +278,17 @@ gst_rtmp_connection_start_output (GstRtmpConnection * sc)
if (!sc->handshake_complete)
return;
+ /* FIXME needs mutex */
+ if (sc->output_source)
+ return;
+
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
sc->output_source =
g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os),
sc->cancellable);
g_source_set_callback (sc->output_source,
(GSourceFunc) gst_rtmp_connection_output_ready, sc, NULL);
- g_source_attach (sc->output_source, NULL);
+ g_source_attach (sc->output_source, sc->output_main_context);
}
static gboolean
@@ -354,6 +364,8 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GST_DEBUG ("output ready");
+ sc->output_source = NULL;
+
if (sc->writing)
return G_SOURCE_REMOVE;
@@ -362,7 +374,7 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
} else {
GstRtmpChunkCacheEntry *entry;
- chunk = g_queue_pop_head (sc->output_queue);
+ chunk = g_async_queue_try_pop (sc->output_queue);
if (!chunk) {
return G_SOURCE_REMOVE;
}
@@ -770,10 +782,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
- if (connection->thread != g_thread_self()) {
- GST_ERROR("Called from wrong thread");
- }
- g_queue_push_tail (connection->output_queue, chunk);
+ g_async_queue_push (connection->output_queue, chunk);
gst_rtmp_connection_start_output (connection);
}
@@ -807,8 +816,8 @@ void
gst_rtmp_connection_start_handshake (GstRtmpConnection * connection,
gboolean is_server)
{
- if (connection->thread != g_thread_self()) {
- GST_ERROR("Called from wrong thread");
+ if (connection->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
}
if (is_server) {
gst_rtmp_connection_set_input_callback (connection,
@@ -942,6 +951,8 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
}
+
+ sc->output_main_context = g_main_context_ref_thread_default ();
gst_rtmp_connection_start_output (sc);
}
@@ -949,7 +960,7 @@ void
gst_rtmp_connection_dump (GstRtmpConnection * connection)
{
g_print (" output_queue: %d\n",
- g_queue_get_length (connection->output_queue));
+ g_async_queue_length (connection->output_queue));
g_print (" input_bytes: %" G_GSIZE_FORMAT "\n",
connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0);
g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes);
@@ -964,8 +975,8 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection,
{
GstRtmpChunk *chunk;
- if (connection->thread != g_thread_self()) {
- GST_ERROR("Called from wrong thread");
+ if (connection->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
}
chunk = gst_rtmp_chunk_new ();
chunk->chunk_stream_id = chunk_stream_id;
@@ -1004,8 +1015,8 @@ gst_rtmp_connection_send_command2 (GstRtmpConnection * connection,
{
GstRtmpChunk *chunk;
- if (connection->thread != g_thread_self()) {
- GST_ERROR("Called from wrong thread");
+ if (connection->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
}
chunk = gst_rtmp_chunk_new ();
chunk->chunk_stream_id = chunk_stream_id;
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index c84cead..2fa847f 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -54,9 +54,10 @@ struct _GstRtmpConnection
GCancellable *cancellable;
int state;
GSocketClient *socket_client;
- GQueue *output_queue;
+ GAsyncQueue *output_queue;
GSimpleAsyncResult *async;
gboolean writing;
+ GMainContext *output_main_context;
GSource *input_source;
GSource *output_source;