diff options
Diffstat (limited to 'rtmp/rtmpconnection.c')
-rw-r--r-- | rtmp/rtmpconnection.c | 378 |
1 files changed, 225 insertions, 153 deletions
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 86181bd..6a217ce 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -25,6 +25,7 @@ #include "rtmpconnection.h" #include "rtmpchunk.h" #include "amf.h" +#include "rtmputils.h" #include <string.h> @@ -39,9 +40,44 @@ static void gst_rtmp_connection_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_finalize (GObject * object); +static void gst_rtmp_connection_client_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc); +static void gst_rtmp_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_write_chunk (GstRtmpConnection * sc); +static void gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_server_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); + -static void proxy_connect (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); enum { @@ -75,6 +111,7 @@ static void gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { rtmpconnection->cancellable = g_cancellable_new (); + rtmpconnection->output_queue = g_queue_new (); } void @@ -139,85 +176,12 @@ gst_rtmp_connection_new (GSocketConnection * connection) sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); sc->connection = connection; - //proxy_connect (sc); - //gst_rtmp_connection_read_chunk (sc); - gst_rtmp_connection_handshake1 (sc); - return sc; } -typedef struct _ChunkRead ChunkRead; -struct _ChunkRead -{ - gpointer data; - gsize alloc_size; - gsize size; - GstRtmpConnection *connection; -}; - - -static void proxy_connect (GstRtmpConnection * sc); -static void proxy_connect_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc); -static void gst_rtmp_connection_read_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk); -static void proxy_write_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void proxy_read_chunk (GstRtmpConnection * sc); -static void proxy_read_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_connection_write_chunk (GstRtmpConnection * - sc, ChunkRead * chunk); -static void gst_rtmp_connection_write_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake1_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, - GBytes * bytes); -static void gst_rtmp_connection_handshake2_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake3_done (GObject * obj, - GAsyncResult * res, gpointer user_data); static void -proxy_connect (GstRtmpConnection * sc) -{ - GSocketConnectable *addr; - - GST_ERROR ("proxy_connect"); - - addr = g_network_address_new ("localhost", 1935); - - sc->socket_client = g_socket_client_new (); - g_socket_client_connect_async (sc->socket_client, addr, - sc->cancellable, proxy_connect_done, sc); -} - -static void -proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data) -{ - GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); - GError *error = NULL; - - GST_ERROR ("proxy_connect_done"); - - sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client, - res, &error); - if (sc->proxy_connection == NULL) { - GST_ERROR ("connection error: %s", error->message); - g_error_free (error); - return; - } - - gst_rtmp_connection_read_chunk (sc); -} - -static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc) { GInputStream *is; @@ -272,7 +236,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, gsize chunk_size; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_read_chunk_done"); + GST_DEBUG ("gst_rtmp_connection_read_chunk_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -282,6 +246,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, } GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + gst_rtmp_dump_data (bytes); chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size); if (chunk) { @@ -289,133 +254,152 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, g_object_unref (chunk); } + + gst_rtmp_connection_read_chunk (connection); } -G_GNUC_UNUSED static void -proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +static void +gst_rtmp_connection_write_chunk (GstRtmpConnection * sc) { GOutputStream *os; + GBytes *bytes; + GstRtmpChunk *chunk; + + if (sc->writing) { + GST_WARNING ("gst_rtmp_connection_write_chunk while writing == TRUE"); + return; + } - os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection)); + chunk = g_queue_pop_head (sc->output_queue); + if (!chunk) { + return; + } - g_output_stream_write_async (os, chunk->data, chunk->size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk); + sc->writing = TRUE; + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + bytes = gst_rtmp_chunk_serialize (chunk); + gst_rtmp_dump_data (bytes); + g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, + sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); } static void -proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data) +gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data); + GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv); GError *error = NULL; gssize ret; - GST_ERROR ("proxy_write_done"); + GST_DEBUG ("gst_rtmp_connection_write_chunk_done"); + + connection->writing = FALSE; - ret = g_output_stream_write_finish (os, res, &error); + ret = g_output_stream_write_bytes_finish (os, res, &error); if (ret < 0) { GST_ERROR ("write error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret); - - proxy_read_chunk (chunk->connection); + GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); + g_object_unref (chunk); - g_free (chunk->data); - g_free (chunk); + gst_rtmp_connection_write_chunk (connection); } static void -proxy_read_chunk (GstRtmpConnection * sc) +gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc) { GInputStream *is; - ChunkRead *chunk; - - chunk = g_malloc0 (sizeof (ChunkRead)); - chunk->alloc_size = 4096; - chunk->data = g_malloc (chunk->alloc_size); - chunk->connection = sc; - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection)); + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_async (is, chunk->data, chunk->alloc_size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk); + g_input_stream_read_bytes_async (is, 4096, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_server_handshake1_done, sc); } static void -proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data) +gst_rtmp_connection_server_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GError *error = NULL; - gssize ret; + GBytes *bytes; - GST_ERROR ("proxy_read_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake1_done"); - ret = g_input_stream_read_finish (is, res, &error); - if (ret < 0) { + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { GST_ERROR ("read error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret); - chunk->size = ret; + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); - gst_rtmp_connection_write_chunk (chunk->connection, chunk); + gst_rtmp_connection_server_handshake2 (sc, bytes); } static void -gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, GBytes * bytes) { GOutputStream *os; + guint8 *data; + GBytes *out_bytes; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - g_output_stream_write_async (os, chunk->data, chunk->size, + data = g_malloc (1 + 1536 + 1536); + memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); + memset (data + 1537, 0, 8); + memset (data + 1537 + 8, 0xef, 1528); + + out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + + g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_write_chunk_done, chunk); + gst_rtmp_connection_server_handshake2_done, sc); } static void -gst_rtmp_connection_write_chunk_done (GObject * obj, +gst_rtmp_connection_server_handshake2_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GError *error = NULL; gssize ret; - GST_ERROR ("gst_rtmp_connection_write_chunk_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake2_done"); - ret = g_output_stream_write_finish (os, res, &error); - if (ret < 0) { - GST_ERROR ("write error: %s", error->message); + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1 + 1536 + 1536) { + GST_ERROR ("read error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); - - gst_rtmp_connection_read_chunk (chunk->connection); + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); - g_free (chunk->data); - g_free (chunk); + gst_rtmp_connection_server_handshake3 (sc); } static void -gst_rtmp_connection_handshake1 (GstRtmpConnection * sc) +gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc) { GInputStream *is; is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_bytes_async (is, 4096, + g_input_stream_read_bytes_async (is, 1536, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake1_done, sc); + gst_rtmp_connection_server_handshake3_done, sc); } static void -gst_rtmp_connection_handshake1_done (GObject * obj, +gst_rtmp_connection_server_handshake3_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); @@ -423,7 +407,7 @@ gst_rtmp_connection_handshake1_done (GObject * obj, GError *error = NULL; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_handshake1_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake3_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -431,34 +415,68 @@ gst_rtmp_connection_handshake1_done (GObject * obj, g_error_free (error); return; } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_read_chunk (sc); +} + +void +gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, + GstRtmpChunk * chunk) +{ + g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); + g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); - gst_rtmp_connection_handshake2 (sc, bytes); + chunk->priv = connection; + g_queue_push_tail (connection->output_queue, chunk); + if (!connection->writing) { + gst_rtmp_connection_write_chunk (connection); + } +} + +void +gst_rtmp_connection_handshake_async (GstRtmpConnection * connection, + gboolean is_server, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GSimpleAsyncResult *async; + + async = g_simple_async_result_new (G_OBJECT (connection), + callback, user_data, gst_rtmp_connection_handshake_async); + g_simple_async_result_set_check_cancellable (async, cancellable); + + connection->cancellable = cancellable; + connection->async = async; + + if (is_server) { + gst_rtmp_connection_server_handshake1 (connection); + } else { + gst_rtmp_connection_client_handshake1 (connection); + } } static void -gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes) +gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc) { GOutputStream *os; guint8 *data; - GBytes *out_bytes; + GBytes *bytes; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - data = g_malloc (1 + 1536 + 1536); - memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); - memset (data + 1537, 0, 8); - memset (data + 1537 + 8, 0xef, 1528); - - out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + data = g_malloc (1 + 1536); + data[0] = 3; + memset (data + 1, 0, 8); + memset (data + 9, 0xa5, 1528); + bytes = g_bytes_new_take (data, 1 + 1536); - g_output_stream_write_bytes_async (os, out_bytes, + g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake2_done, sc); + gst_rtmp_connection_client_handshake1_done, sc); } static void -gst_rtmp_connection_handshake2_done (GObject * obj, +gst_rtmp_connection_client_handshake1_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); @@ -466,33 +484,33 @@ gst_rtmp_connection_handshake2_done (GObject * obj, GError *error = NULL; gssize ret; - GST_ERROR ("gst_rtmp_connection_handshake2_done"); + GST_DEBUG ("gst_rtmp_connection_client_handshake2_done"); ret = g_output_stream_write_bytes_finish (os, res, &error); - if (ret < 1 + 1536 + 1536) { - GST_ERROR ("read error: %s", error->message); + if (ret < 1 + 1536) { + GST_ERROR ("write error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); - gst_rtmp_connection_handshake3 (sc); + gst_rtmp_connection_client_handshake2 (sc); } static void -gst_rtmp_connection_handshake3 (GstRtmpConnection * sc) +gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc) { GInputStream *is; is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_bytes_async (is, 1536, + g_input_stream_read_bytes_async (is, 1 + 1536 + 1536, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake3_done, sc); + gst_rtmp_connection_client_handshake2_done, sc); } static void -gst_rtmp_connection_handshake3_done (GObject * obj, +gst_rtmp_connection_client_handshake2_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); @@ -500,7 +518,7 @@ gst_rtmp_connection_handshake3_done (GObject * obj, GError *error = NULL; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_handshake3_done"); + GST_DEBUG ("gst_rtmp_connection_client_handshake2_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -508,8 +526,62 @@ gst_rtmp_connection_handshake3_done (GObject * obj, g_error_free (error); return; } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_client_handshake3 (sc, bytes); +} + +static void +gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, GBytes * bytes) +{ + GOutputStream *os; + GBytes *out_bytes; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + out_bytes = g_bytes_new_from_bytes (bytes, 1 + 1536, 1536); + + g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT, + sc->cancellable, gst_rtmp_connection_client_handshake3_done, sc); +} + +static void +gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + gssize ret; + + GST_DEBUG ("gst_rtmp_connection_client_handshake3_done"); + + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1536) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + + g_simple_async_result_complete (sc->async); gst_rtmp_connection_read_chunk (sc); - (void) &proxy_connect; +} + +gboolean +gst_rtmp_connection_handshake_finish (GstRtmpConnection * connection, + GAsyncResult * result, GError ** error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (g_simple_async_result_is_valid (result, + G_OBJECT (connection), gst_rtmp_connection_handshake_async), FALSE); + + simple = (GSimpleAsyncResult *) result; + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + return TRUE; } |