diff options
Diffstat (limited to 'rtmp/rtmpconnection.c')
-rw-r--r-- | rtmp/rtmpconnection.c | 93 |
1 files changed, 75 insertions, 18 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 625f721..6e37caa 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -104,6 +104,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { rtmpconnection->cancellable = g_cancellable_new (); rtmpconnection->output_queue = g_queue_new (); + rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new (); + rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new (); } void @@ -156,6 +158,8 @@ gst_rtmp_connection_finalize (GObject * object) GST_DEBUG_OBJECT (rtmpconnection, "finalize"); /* clean up object here */ + gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache); + gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache); G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object); } @@ -177,7 +181,6 @@ gst_rtmp_connection_new (GSocketConnection * connection) (GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL); g_source_attach (sc->input_source, NULL); - return sc; } @@ -209,15 +212,22 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) GST_DEBUG ("input ready"); - data = g_malloc (128); + data = g_malloc (4096); ret = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is), - data, 128, sc->cancellable, &error); + data, 4096, sc->cancellable, &error); if (ret < 0) { GST_ERROR ("read error: %s", error->message); g_error_free (error); return G_SOURCE_REMOVE; } + if (ret == 0) { + /* FIXME probably closed */ + GST_ERROR ("closed?"); + return G_SOURCE_REMOVE; + } + + GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret); if (sc->input_bytes) { sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret); @@ -225,15 +235,18 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) sc->input_bytes = g_bytes_new_take (data, ret); } - if (sc->input_callback) { - if (g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) { - GstRtmpConnectionCallback callback; - GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", - g_bytes_get_size (sc->input_bytes)); - callback = sc->input_callback; - sc->input_callback = NULL; - (*callback) (sc); - } + //GST_ERROR("ic: %p", sc->input_callback); + //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); + GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes); + + while (sc->input_callback && sc->input_bytes && + g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) { + GstRtmpConnectionCallback callback; + GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + g_bytes_get_size (sc->input_bytes)); + callback = sc->input_callback; + sc->input_callback = NULL; + (*callback) (sc); } return G_SOURCE_CONTINUE; @@ -246,10 +259,10 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) GstRtmpChunk *chunk; GBytes *bytes; - GST_ERROR ("output ready"); + GST_DEBUG ("output ready"); if (sc->writing) { - GST_ERROR ("busy writing"); + GST_DEBUG ("busy writing"); return G_SOURCE_REMOVE; } @@ -257,11 +270,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) if (!chunk) { return G_SOURCE_REMOVE; } + GST_ERROR ("popped"); sc->writing = TRUE; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - bytes = gst_rtmp_chunk_serialize (chunk); - //gst_rtmp_dump_data (bytes); + bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache); + gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk); + gst_rtmp_dump_data (bytes); g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); @@ -407,6 +422,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc) GST_ERROR ("server handshake finished"); sc->handshake_complete = TRUE; + if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { + GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + g_bytes_get_size (sc->input_bytes)); + gst_rtmp_connection_chunk_callback (sc); + } + gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); @@ -417,7 +438,7 @@ static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) { GstRtmpChunk *chunk; - gsize size; + gsize size = 0; while (1) { GBytes *bytes; @@ -425,17 +446,23 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) if (sc->input_bytes == NULL) break; - chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size); + chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size, + sc->input_chunk_cache); if (chunk == NULL) break; + gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk); + bytes = gst_rtmp_connection_take_input_bytes (sc, size); g_bytes_unref (bytes); GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length); g_signal_emit_by_name (sc, "got-chunk", chunk); g_object_unref (chunk); + + size = 0; } + GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size); gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, size); } @@ -447,6 +474,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); + GST_ERROR ("queued"); chunk->priv = connection; g_queue_push_tail (connection->output_queue, chunk); gst_rtmp_connection_start_output (connection); @@ -464,6 +492,18 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection, if (needed_bytes == 0) { connection->input_needed_bytes = 1; } + + if (connection->input_callback && connection->input_bytes && + g_bytes_get_size (connection->input_bytes) >= + connection->input_needed_bytes) { + GstRtmpConnectionCallback callback; + GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + g_bytes_get_size (connection->input_bytes)); + callback = connection->input_callback; + connection->input_callback = NULL; + (*callback) (connection); + } + } void @@ -581,7 +621,24 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, GST_ERROR ("client handshake finished"); sc->handshake_complete = TRUE; + if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { + GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + g_bytes_get_size (sc->input_bytes)); + gst_rtmp_connection_chunk_callback (sc); + } + gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); gst_rtmp_connection_start_output (sc); } + +void +gst_rtmp_connection_dump (GstRtmpConnection * connection) +{ + g_print (" output_queue: %d\n", + g_queue_get_length (connection->output_queue)); + g_print (" input_bytes: %" G_GSIZE_FORMAT "\n", + connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0); + g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes); + +} |