diff options
author | David Schleef <ds@schleef.org> | 2014-09-14 22:31:12 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-09-14 22:33:48 -0700 |
commit | 5ac2cd235320b8fbe7a06a677701ab8a154bc363 (patch) | |
tree | 36aec1f3cdd6f581b52d38304bb5307edfe75f99 | |
parent | 9ae7b379fa98bd693370d7b6c746a92016529e70 (diff) |
connection: fix output queue threading problems
-rw-r--r-- | rtmp/rtmpconnection.c | 47 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 3 |
2 files changed, 31 insertions, 19 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 749d657..4ad0d54 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -127,7 +127,7 @@ static void gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { rtmpconnection->cancellable = g_cancellable_new (); - rtmpconnection->output_queue = g_queue_new (); + rtmpconnection->output_queue = g_async_queue_new (); rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new (); rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new (); @@ -182,6 +182,7 @@ gst_rtmp_connection_finalize (GObject * object) { GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object); GSocket *sock; + gpointer p; GST_DEBUG_OBJECT (rtmpconnection, "finalize"); @@ -205,7 +206,12 @@ gst_rtmp_connection_finalize (GObject * object) g_object_unref (sock); } - g_queue_free_full (rtmpconnection->output_queue, g_object_unref); + p = g_async_queue_try_pop (rtmpconnection->output_queue); + while (p) { + g_object_unref (p); + p = g_async_queue_try_pop (rtmpconnection->output_queue); + } + g_async_queue_unref (rtmpconnection->output_queue); gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache); gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache); @@ -228,7 +234,7 @@ gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc, { GInputStream *is; - sc->thread = g_thread_self(); + sc->thread = g_thread_self (); sc->connection = connection; /* refs the socket because it's creating an input stream, which holds a ref */ @@ -245,8 +251,8 @@ gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc, void gst_rtmp_connection_close (GstRtmpConnection * connection) { - if (connection->thread != g_thread_self()) { - GST_ERROR("Called from wrong thread"); + if (connection->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); } g_cancellable_cancel (connection->cancellable); @@ -272,13 +278,17 @@ gst_rtmp_connection_start_output (GstRtmpConnection * sc) if (!sc->handshake_complete) return; + /* FIXME needs mutex */ + if (sc->output_source) + return; + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); sc->output_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os), sc->cancellable); g_source_set_callback (sc->output_source, (GSourceFunc) gst_rtmp_connection_output_ready, sc, NULL); - g_source_attach (sc->output_source, NULL); + g_source_attach (sc->output_source, sc->output_main_context); } static gboolean @@ -354,6 +364,8 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) GST_DEBUG ("output ready"); + sc->output_source = NULL; + if (sc->writing) return G_SOURCE_REMOVE; @@ -362,7 +374,7 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) } else { GstRtmpChunkCacheEntry *entry; - chunk = g_queue_pop_head (sc->output_queue); + chunk = g_async_queue_try_pop (sc->output_queue); if (!chunk) { return G_SOURCE_REMOVE; } @@ -770,10 +782,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); - if (connection->thread != g_thread_self()) { - GST_ERROR("Called from wrong thread"); - } - g_queue_push_tail (connection->output_queue, chunk); + g_async_queue_push (connection->output_queue, chunk); gst_rtmp_connection_start_output (connection); } @@ -807,8 +816,8 @@ void gst_rtmp_connection_start_handshake (GstRtmpConnection * connection, gboolean is_server) { - if (connection->thread != g_thread_self()) { - GST_ERROR("Called from wrong thread"); + if (connection->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); } if (is_server) { gst_rtmp_connection_set_input_callback (connection, @@ -942,6 +951,8 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); } + + sc->output_main_context = g_main_context_ref_thread_default (); gst_rtmp_connection_start_output (sc); } @@ -949,7 +960,7 @@ void gst_rtmp_connection_dump (GstRtmpConnection * connection) { g_print (" output_queue: %d\n", - g_queue_get_length (connection->output_queue)); + g_async_queue_length (connection->output_queue)); g_print (" input_bytes: %" G_GSIZE_FORMAT "\n", connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0); g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes); @@ -964,8 +975,8 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection, { GstRtmpChunk *chunk; - if (connection->thread != g_thread_self()) { - GST_ERROR("Called from wrong thread"); + if (connection->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); } chunk = gst_rtmp_chunk_new (); chunk->chunk_stream_id = chunk_stream_id; @@ -1004,8 +1015,8 @@ gst_rtmp_connection_send_command2 (GstRtmpConnection * connection, { GstRtmpChunk *chunk; - if (connection->thread != g_thread_self()) { - GST_ERROR("Called from wrong thread"); + if (connection->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); } chunk = gst_rtmp_chunk_new (); chunk->chunk_stream_id = chunk_stream_id; diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index c84cead..2fa847f 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -54,9 +54,10 @@ struct _GstRtmpConnection GCancellable *cancellable; int state; GSocketClient *socket_client; - GQueue *output_queue; + GAsyncQueue *output_queue; GSimpleAsyncResult *async; gboolean writing; + GMainContext *output_main_context; GSource *input_source; GSource *output_source; |