From 233d7e887e079b0f1c3da2366443744e54573324 Mon Sep 17 00:00:00 2001 From: David Schleef Date: Sun, 24 Aug 2014 18:18:43 -0700 Subject: hacking --- rtmp/Makefile.am | 4 +- rtmp/rtmpchunk.c | 32 +++++ rtmp/rtmpchunk.h | 2 + rtmp/rtmpclient.c | 243 +++++++------------------------- rtmp/rtmpclient.h | 13 +- rtmp/rtmpconnection.c | 378 ++++++++++++++++++++++++++++++-------------------- rtmp/rtmpconnection.h | 17 ++- rtmp/rtmpserver.c | 25 +++- rtmp/rtmputils.c | 51 +++++++ rtmp/rtmputils.h | 32 +++++ tools/proxy-server.c | 72 ++++++---- 11 files changed, 485 insertions(+), 384 deletions(-) create mode 100644 rtmp/rtmputils.c create mode 100644 rtmp/rtmputils.h diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am index 18bc37c..4cab66b 100644 --- a/rtmp/Makefile.am +++ b/rtmp/Makefile.am @@ -30,4 +30,6 @@ sources = \ rtmpserver.c \ rtmpserver.h \ rtmpstream.c \ - rtmpstream.h + rtmpstream.h \ + rtmputils.c \ + rtmputils.h diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index de0f46b..7cc5f43 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -23,6 +23,7 @@ #include #include "rtmpchunk.h" +#include GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category); #define GST_CAT_DEFAULT gst_rtmp_chunk_debug_category @@ -236,6 +237,37 @@ gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size) return NULL; } +GBytes * +gst_rtmp_chunk_serialize (GstRtmpChunk * chunk) +{ + guint8 *data; + const guint8 *chunkdata; + gsize chunksize; + int header_fmt; + + /* FIXME this is incomplete and inefficient */ + chunkdata = g_bytes_get_data (chunk->payload, &chunksize); + data = g_malloc (chunksize + 12); + header_fmt = 0; + g_assert (chunk->stream_id < 64); + data[0] = (header_fmt << 6) | (chunk->stream_id); + g_assert (chunk->timestamp < 0xffffff); + data[1] = (chunk->timestamp >> 16) & 0xff; + data[2] = (chunk->timestamp >> 8) & 0xff; + data[3] = chunk->timestamp & 0xff; + data[4] = (chunk->message_length >> 16) & 0xff; + data[5] = (chunk->message_length >> 8) & 0xff; + data[6] = chunk->message_length & 0xff; + data[7] = chunk->message_type_id; + data[8] = 0; + data[9] = 0; + data[10] = 0; + data[11] = 0; + memcpy (data + 12, chunkdata, chunksize); + + return g_bytes_new_take (data, chunksize + 12); +} + void gst_rtmp_chunk_set_stream_id (GstRtmpChunk * chunk, guint32 stream_id) { diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 68edfcb..1165fb7 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -43,6 +43,7 @@ struct _GstRtmpChunk int message_type_id; GBytes *payload; + gpointer priv; }; struct _GstRtmpChunkClass @@ -63,6 +64,7 @@ GstRtmpChunk *gst_rtmp_chunk_new (void); GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes, gsize *chunk_size); GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size); +GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk); void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id); void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp); diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 9db1519..6a4f373 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -39,19 +39,10 @@ static void gst_rtmp_client_dispose (GObject * object); static void gst_rtmp_client_finalize (GObject * object); static void -gst_rtmp_client_connect_start (GstRtmpClient * client, - GCancellable * cancellable, GSimpleAsyncResult * async); -static void -gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result, - gpointer user_data); -static void -gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result, +gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result, gpointer user_data); static void -gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result, - gpointer user_data); -static void -gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result, +gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result, gpointer user_data); enum @@ -167,7 +158,8 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *simple; + GSimpleAsyncResult *async; + GSocketConnectable *addr; if (client->state != GST_RTMP_CLIENT_STATE_NEW) { g_simple_async_report_error_in_idle (G_OBJECT (client), @@ -176,226 +168,89 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, return; } - simple = g_simple_async_result_new (G_OBJECT (client), + async = g_simple_async_result_new (G_OBJECT (client), callback, user_data, gst_rtmp_client_connect_async); + g_simple_async_result_set_check_cancellable (async, cancellable); - gst_rtmp_client_connect_start (client, cancellable, simple); -} - -gboolean -gst_rtmp_client_connect_finish (GstRtmpClient * client, - GAsyncResult * result, GError ** error) -{ - GSimpleAsyncResult *simple; - - g_return_val_if_fail (g_simple_async_result_is_valid (result, - G_OBJECT (client), gst_rtmp_client_connect_async), FALSE); - - simple = (GSimpleAsyncResult *) result; - - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; - - return TRUE; -} - - -/* async implementation */ - -/* connect */ - -typedef struct _ConnectContext ConnectContext; -struct _ConnectContext -{ - GstRtmpClient *client; - GSimpleAsyncResult *async; - GCancellable *cancellable; - - GSocketClient *socket_client; - GSocketConnection *connection; - guint8 *handshake_send; - guint8 *handshake_recv; -}; - -static void -gst_rtmp_client_connect_complete (ConnectContext * context) -{ - GSimpleAsyncResult *async = context->async; - - if (context->socket_client) - g_object_unref (context->socket_client); - if (context->connection) - g_object_unref (context->connection); - g_free (context->handshake_send); - g_free (context->handshake_recv); - g_free (context); - g_simple_async_result_complete (async); -} - -static void -gst_rtmp_client_connect_start (GstRtmpClient * client, - GCancellable * cancellable, GSimpleAsyncResult * async) -{ - ConnectContext *context; - GSocketConnectable *addr; - - context = g_malloc0 (sizeof (ConnectContext)); - context->cancellable = cancellable; - context->client = client; - context->async = async; + client->cancellable = cancellable; + client->async = async; addr = g_network_address_new ("localhost", 1935); - context->socket_client = g_socket_client_new (); + client->socket_client = g_socket_client_new (); - GST_ERROR ("g_socket_client_connect_async"); - g_socket_client_connect_async (context->socket_client, addr, - context->cancellable, gst_rtmp_client_connect_1, context); + GST_DEBUG ("g_socket_client_connect_async"); + g_socket_client_connect_async (client->socket_client, addr, + client->cancellable, gst_rtmp_client_connect_done, client); } - static void -gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result, +gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result, gpointer user_data) { - ConnectContext *context = user_data; + GstRtmpClient *client = GST_RTMP_CLIENT (user_data); GError *error = NULL; - GOutputStream *os; - GST_ERROR ("g_socket_client_connect_finish"); - context->connection = g_socket_client_connect_finish (context->socket_client, - result, &error); - if (context->connection == NULL) { + GST_DEBUG ("g_socket_client_connect_done"); + client->socket_connection = + g_socket_client_connect_finish (client->socket_client, result, &error); + if (client->socket_connection == NULL) { GST_ERROR ("error"); - g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + g_simple_async_result_set_error (client->async, GST_RTMP_ERROR, GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); g_error_free (error); - gst_rtmp_client_connect_complete (context); + client->cancellable = NULL; + g_simple_async_result_complete (client->async); return; } - context->handshake_send = g_malloc (1537); - context->handshake_send[0] = 3; - memset (context->handshake_send + 1, 0, 8); - memset (context->handshake_send + 9, 0xa5, 1528); - - os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection)); - - GST_ERROR ("g_output_stream_write_async"); - g_output_stream_write_async (os, context->handshake_send, 1537, - G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_2, - context); + client->connection = gst_rtmp_connection_new (client->socket_connection); + gst_rtmp_connection_handshake_async (client->connection, FALSE, + client->cancellable, gst_rtmp_client_handshake_done, client); } static void -gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result, +gst_rtmp_client_handshake_done (GObject * source, GAsyncResult * result, gpointer user_data) { - ConnectContext *context = user_data; - GOutputStream *os; - GInputStream *is; + GstRtmpClient *client = GST_RTMP_CLIENT (user_data); GError *error = NULL; - gssize n; + gboolean ret; - GST_ERROR ("g_output_stream_write_finish"); - os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection)); - n = g_output_stream_write_finish (os, result, &error); - GST_ERROR ("wrote %" G_GSIZE_FORMAT " bytes", n); - if (error) { - GST_ERROR ("error"); - g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_DEBUG ("g_socket_client_connect_done"); + ret = gst_rtmp_connection_handshake_finish (client->connection, + result, &error); + if (!ret) { + g_simple_async_result_set_error (client->async, GST_RTMP_ERROR, GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); g_error_free (error); - gst_rtmp_client_connect_complete (context); + client->cancellable = NULL; + g_simple_async_result_complete (client->async); return; } - context->handshake_recv = g_malloc (1537); - - GST_ERROR ("g_input_stream_read_async"); - is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); - g_input_stream_read_async (is, context->handshake_recv, 1537, - G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_3, - context); + client->cancellable = NULL; + g_simple_async_result_complete (client->async); } -static void -gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result, - gpointer user_data) +gboolean +gst_rtmp_client_connect_finish (GstRtmpClient * client, + GAsyncResult * result, GError ** error) { - ConnectContext *context = user_data; - GInputStream *is; - GError *error = NULL; - gssize n; + GSimpleAsyncResult *simple; - GST_ERROR ("g_input_stream_read_finish"); - is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); - n = g_input_stream_read_finish (is, result, &error); - GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n); - if (error) { - GST_ERROR ("error"); - g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, - GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); - g_error_free (error); - gst_rtmp_client_connect_complete (context); - return; - } -#if 0 - GST_ERROR ("recv: %02x %02x %02x %02x %02x %02x %02x %02x %02x", - context->handshake_recv[0], context->handshake_recv[1], - context->handshake_recv[2], context->handshake_recv[3], - context->handshake_recv[4], context->handshake_recv[5], - context->handshake_recv[6], context->handshake_recv[7], - context->handshake_recv[8]); -#endif + g_return_val_if_fail (g_simple_async_result_is_valid (result, + G_OBJECT (client), gst_rtmp_client_connect_async), FALSE); - n = g_output_stream_write (g_io_stream_get_output_stream (G_IO_STREAM - (context->connection)), context->handshake_recv + 1, 1536, - context->cancellable, &error); - if (n < 1536) { - GST_ERROR ("error"); - g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, - GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); - g_error_free (error); - gst_rtmp_client_connect_complete (context); - return; - } + simple = (GSimpleAsyncResult *) result; - GST_ERROR ("g_input_stream_read_async"); - is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); - g_input_stream_read_async (is, context->handshake_recv + 1, 1536, - G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_4, - context); + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + return TRUE; } -static void -gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result, - gpointer user_data) +GstRtmpConnection * +gst_rtmp_client_get_connection (GstRtmpClient * client) { - ConnectContext *context = user_data; - GInputStream *is; - GError *error = NULL; - gssize n; - - GST_ERROR ("g_input_stream_read_finish"); - is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); - n = g_input_stream_read_finish (is, result, &error); - GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n); - if (error) { - GST_ERROR ("error"); - g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, - GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); - g_error_free (error); - gst_rtmp_client_connect_complete (context); - return; - } - - context->client->socket_client = context->socket_client; - context->socket_client = NULL; - context->client->connection = context->connection; - context->connection = NULL; - context->client->state = GST_RTMP_CLIENT_STATE_CONNECTED; - - GST_ERROR ("got here"); - gst_rtmp_client_connect_complete (context); + return client->connection; } diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h index 8a1b8b1..3a209db 100644 --- a/rtmp/rtmpclient.h +++ b/rtmp/rtmpclient.h @@ -22,6 +22,7 @@ #include #include +#include G_BEGIN_DECLS @@ -67,9 +68,12 @@ struct _GstRtmpClient GCond cond; GMainContext *context; + GCancellable *cancellable; + GSimpleAsyncResult *async; GSocketClient *socket_client; - GSocketConnection *connection; + GSocketConnection *socket_connection; + GstRtmpConnection *connection; }; struct _GstRtmpClientClass @@ -93,12 +97,7 @@ void gst_rtmp_client_connect_async (GstRtmpClient *client, gboolean gst_rtmp_client_connect_finish (GstRtmpClient *client, GAsyncResult *result, GError **error); -void gst_rtmp_client_queue_message (GstRtmpClient *client, - GstRtmpMessage *message, GstRtmpClientMessageCallback callback, - gpointer user_data); -void gst_rtmp_client_queue_chunk (GstRtmpClient *client, - GstRtmpChunk *Chunk, GstRtmpClientChunkCallback callback, - gpointer user_data); +GstRtmpConnection *gst_rtmp_client_get_connection (GstRtmpClient *client); G_END_DECLS diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 86181bd..6a217ce 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -25,6 +25,7 @@ #include "rtmpconnection.h" #include "rtmpchunk.h" #include "amf.h" +#include "rtmputils.h" #include @@ -39,9 +40,44 @@ static void gst_rtmp_connection_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_finalize (GObject * object); +static void gst_rtmp_connection_client_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc); +static void gst_rtmp_connection_client_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc); +static void gst_rtmp_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_write_chunk (GstRtmpConnection * sc); +static void gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_server_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); + -static void proxy_connect (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); enum { @@ -75,6 +111,7 @@ static void gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { rtmpconnection->cancellable = g_cancellable_new (); + rtmpconnection->output_queue = g_queue_new (); } void @@ -139,84 +176,11 @@ gst_rtmp_connection_new (GSocketConnection * connection) sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); sc->connection = connection; - //proxy_connect (sc); - //gst_rtmp_connection_read_chunk (sc); - gst_rtmp_connection_handshake1 (sc); - return sc; } -typedef struct _ChunkRead ChunkRead; -struct _ChunkRead -{ - gpointer data; - gsize alloc_size; - gsize size; - GstRtmpConnection *connection; -}; - - -static void proxy_connect (GstRtmpConnection * sc); -static void proxy_connect_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc); -static void gst_rtmp_connection_read_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk); -static void proxy_write_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void proxy_read_chunk (GstRtmpConnection * sc); -static void proxy_read_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_connection_write_chunk (GstRtmpConnection * - sc, ChunkRead * chunk); -static void gst_rtmp_connection_write_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake1_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, - GBytes * bytes); -static void gst_rtmp_connection_handshake2_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc); -static void gst_rtmp_connection_handshake3_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void -proxy_connect (GstRtmpConnection * sc) -{ - GSocketConnectable *addr; - - GST_ERROR ("proxy_connect"); - - addr = g_network_address_new ("localhost", 1935); - - sc->socket_client = g_socket_client_new (); - g_socket_client_connect_async (sc->socket_client, addr, - sc->cancellable, proxy_connect_done, sc); -} - -static void -proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data) -{ - GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); - GError *error = NULL; - - GST_ERROR ("proxy_connect_done"); - - sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client, - res, &error); - if (sc->proxy_connection == NULL) { - GST_ERROR ("connection error: %s", error->message); - g_error_free (error); - return; - } - - gst_rtmp_connection_read_chunk (sc); -} - static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc) { @@ -272,7 +236,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, gsize chunk_size; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_read_chunk_done"); + GST_DEBUG ("gst_rtmp_connection_read_chunk_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -282,6 +246,7 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, } GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + gst_rtmp_dump_data (bytes); chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size); if (chunk) { @@ -289,133 +254,152 @@ gst_rtmp_connection_read_chunk_done (GObject * obj, g_object_unref (chunk); } + + gst_rtmp_connection_read_chunk (connection); } -G_GNUC_UNUSED static void -proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +static void +gst_rtmp_connection_write_chunk (GstRtmpConnection * sc) { GOutputStream *os; + GBytes *bytes; + GstRtmpChunk *chunk; + + if (sc->writing) { + GST_WARNING ("gst_rtmp_connection_write_chunk while writing == TRUE"); + return; + } - os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection)); + chunk = g_queue_pop_head (sc->output_queue); + if (!chunk) { + return; + } - g_output_stream_write_async (os, chunk->data, chunk->size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk); + sc->writing = TRUE; + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + bytes = gst_rtmp_chunk_serialize (chunk); + gst_rtmp_dump_data (bytes); + g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, + sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); } static void -proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data) +gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpChunk *chunk = GST_RTMP_CHUNK (user_data); + GstRtmpConnection *connection = GST_RTMP_CONNECTION (chunk->priv); GError *error = NULL; gssize ret; - GST_ERROR ("proxy_write_done"); + GST_DEBUG ("gst_rtmp_connection_write_chunk_done"); + + connection->writing = FALSE; - ret = g_output_stream_write_finish (os, res, &error); + ret = g_output_stream_write_bytes_finish (os, res, &error); if (ret < 0) { GST_ERROR ("write error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret); - - proxy_read_chunk (chunk->connection); + GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); + g_object_unref (chunk); - g_free (chunk->data); - g_free (chunk); + gst_rtmp_connection_write_chunk (connection); } static void -proxy_read_chunk (GstRtmpConnection * sc) +gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc) { GInputStream *is; - ChunkRead *chunk; - - chunk = g_malloc0 (sizeof (ChunkRead)); - chunk->alloc_size = 4096; - chunk->data = g_malloc (chunk->alloc_size); - chunk->connection = sc; - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection)); + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_async (is, chunk->data, chunk->alloc_size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk); + g_input_stream_read_bytes_async (is, 4096, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_server_handshake1_done, sc); } static void -proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data) +gst_rtmp_connection_server_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GError *error = NULL; - gssize ret; + GBytes *bytes; - GST_ERROR ("proxy_read_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake1_done"); - ret = g_input_stream_read_finish (is, res, &error); - if (ret < 0) { + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { GST_ERROR ("read error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret); - chunk->size = ret; + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); - gst_rtmp_connection_write_chunk (chunk->connection, chunk); + gst_rtmp_connection_server_handshake2 (sc, bytes); } static void -gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc, GBytes * bytes) { GOutputStream *os; + guint8 *data; + GBytes *out_bytes; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - g_output_stream_write_async (os, chunk->data, chunk->size, + data = g_malloc (1 + 1536 + 1536); + memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); + memset (data + 1537, 0, 8); + memset (data + 1537 + 8, 0xef, 1528); + + out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + + g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_write_chunk_done, chunk); + gst_rtmp_connection_server_handshake2_done, sc); } static void -gst_rtmp_connection_write_chunk_done (GObject * obj, +gst_rtmp_connection_server_handshake2_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GError *error = NULL; gssize ret; - GST_ERROR ("gst_rtmp_connection_write_chunk_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake2_done"); - ret = g_output_stream_write_finish (os, res, &error); - if (ret < 0) { - GST_ERROR ("write error: %s", error->message); + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1 + 1536 + 1536) { + GST_ERROR ("read error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); - - gst_rtmp_connection_read_chunk (chunk->connection); + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); - g_free (chunk->data); - g_free (chunk); + gst_rtmp_connection_server_handshake3 (sc); } static void -gst_rtmp_connection_handshake1 (GstRtmpConnection * sc) +gst_rtmp_connection_server_handshake3 (GstRtmpConnection * sc) { GInputStream *is; is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_bytes_async (is, 4096, + g_input_stream_read_bytes_async (is, 1536, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake1_done, sc); + gst_rtmp_connection_server_handshake3_done, sc); } static void -gst_rtmp_connection_handshake1_done (GObject * obj, +gst_rtmp_connection_server_handshake3_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); @@ -423,7 +407,7 @@ gst_rtmp_connection_handshake1_done (GObject * obj, GError *error = NULL; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_handshake1_done"); + GST_DEBUG ("gst_rtmp_connection_server_handshake3_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -431,34 +415,68 @@ gst_rtmp_connection_handshake1_done (GObject * obj, g_error_free (error); return; } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_read_chunk (sc); +} + +void +gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, + GstRtmpChunk * chunk) +{ + g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); + g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); - gst_rtmp_connection_handshake2 (sc, bytes); + chunk->priv = connection; + g_queue_push_tail (connection->output_queue, chunk); + if (!connection->writing) { + gst_rtmp_connection_write_chunk (connection); + } +} + +void +gst_rtmp_connection_handshake_async (GstRtmpConnection * connection, + gboolean is_server, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GSimpleAsyncResult *async; + + async = g_simple_async_result_new (G_OBJECT (connection), + callback, user_data, gst_rtmp_connection_handshake_async); + g_simple_async_result_set_check_cancellable (async, cancellable); + + connection->cancellable = cancellable; + connection->async = async; + + if (is_server) { + gst_rtmp_connection_server_handshake1 (connection); + } else { + gst_rtmp_connection_client_handshake1 (connection); + } } static void -gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes) +gst_rtmp_connection_client_handshake1 (GstRtmpConnection * sc) { GOutputStream *os; guint8 *data; - GBytes *out_bytes; + GBytes *bytes; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - data = g_malloc (1 + 1536 + 1536); - memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); - memset (data + 1537, 0, 8); - memset (data + 1537 + 8, 0xef, 1528); - - out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + data = g_malloc (1 + 1536); + data[0] = 3; + memset (data + 1, 0, 8); + memset (data + 9, 0xa5, 1528); + bytes = g_bytes_new_take (data, 1 + 1536); - g_output_stream_write_bytes_async (os, out_bytes, + g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake2_done, sc); + gst_rtmp_connection_client_handshake1_done, sc); } static void -gst_rtmp_connection_handshake2_done (GObject * obj, +gst_rtmp_connection_client_handshake1_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GOutputStream *os = G_OUTPUT_STREAM (obj); @@ -466,33 +484,33 @@ gst_rtmp_connection_handshake2_done (GObject * obj, GError *error = NULL; gssize ret; - GST_ERROR ("gst_rtmp_connection_handshake2_done"); + GST_DEBUG ("gst_rtmp_connection_client_handshake2_done"); ret = g_output_stream_write_bytes_finish (os, res, &error); - if (ret < 1 + 1536 + 1536) { - GST_ERROR ("read error: %s", error->message); + if (ret < 1 + 1536) { + GST_ERROR ("write error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); - gst_rtmp_connection_handshake3 (sc); + gst_rtmp_connection_client_handshake2 (sc); } static void -gst_rtmp_connection_handshake3 (GstRtmpConnection * sc) +gst_rtmp_connection_client_handshake2 (GstRtmpConnection * sc) { GInputStream *is; is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_bytes_async (is, 1536, + g_input_stream_read_bytes_async (is, 1 + 1536 + 1536, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_connection_handshake3_done, sc); + gst_rtmp_connection_client_handshake2_done, sc); } static void -gst_rtmp_connection_handshake3_done (GObject * obj, +gst_rtmp_connection_client_handshake2_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); @@ -500,7 +518,7 @@ gst_rtmp_connection_handshake3_done (GObject * obj, GError *error = NULL; GBytes *bytes; - GST_ERROR ("gst_rtmp_connection_handshake3_done"); + GST_DEBUG ("gst_rtmp_connection_client_handshake2_done"); bytes = g_input_stream_read_bytes_finish (is, res, &error); if (bytes == NULL) { @@ -508,8 +526,62 @@ gst_rtmp_connection_handshake3_done (GObject * obj, g_error_free (error); return; } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + GST_DEBUG ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_client_handshake3 (sc, bytes); +} + +static void +gst_rtmp_connection_client_handshake3 (GstRtmpConnection * sc, GBytes * bytes) +{ + GOutputStream *os; + GBytes *out_bytes; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + out_bytes = g_bytes_new_from_bytes (bytes, 1 + 1536, 1536); + + g_output_stream_write_bytes_async (os, out_bytes, G_PRIORITY_DEFAULT, + sc->cancellable, gst_rtmp_connection_client_handshake3_done, sc); +} + +static void +gst_rtmp_connection_client_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + gssize ret; + + GST_DEBUG ("gst_rtmp_connection_client_handshake3_done"); + + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1536) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_DEBUG ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + + g_simple_async_result_complete (sc->async); gst_rtmp_connection_read_chunk (sc); - (void) &proxy_connect; +} + +gboolean +gst_rtmp_connection_handshake_finish (GstRtmpConnection * connection, + GAsyncResult * result, GError ** error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (g_simple_async_result_is_valid (result, + G_OBJECT (connection), gst_rtmp_connection_handshake_async), FALSE); + + simple = (GSimpleAsyncResult *) result; + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + return TRUE; } diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index 8713147..cde9e58 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -38,12 +38,18 @@ struct _GstRtmpConnection { GObject object; + /* should be properties */ + gboolean input_paused; + /* private */ GSocketConnection *connection; GSocketConnection *proxy_connection; GCancellable *cancellable; int state; GSocketClient *socket_client; + GQueue *output_queue; + GSimpleAsyncResult *async; + gboolean writing; }; struct _GstRtmpConnectionClass @@ -57,8 +63,15 @@ struct _GstRtmpConnectionClass GType gst_rtmp_connection_get_type (void); -GstRtmpConnection *gst_rtmp_connection_new ( - GSocketConnection *connection); +GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection *connection); + +void gst_rtmp_connection_handshake_async (GstRtmpConnection *connection, + gboolean is_server, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data); +gboolean gst_rtmp_connection_handshake_finish (GstRtmpConnection *connection, + GAsyncResult *result, GError **error); +void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection, + GstRtmpChunk *chunk); G_END_DECLS diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index e35bff6..e9527c8 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -38,6 +38,9 @@ static void gst_rtmp_server_finalize (GObject * object); static gboolean gst_rtmp_server_incoming (GSocketService * service, GSocketConnection * connection, GObject * source_object, gpointer user_data); +static void +gst_rtmp_server_handshake_done (GObject * source, GAsyncResult * result, + gpointer user_data); enum @@ -179,15 +182,35 @@ gst_rtmp_server_incoming (GSocketService * service, GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data); GstRtmpConnection *connection; - GST_ERROR ("client connected"); + GST_INFO ("client connected"); g_object_ref (socket_connection); connection = gst_rtmp_connection_new (socket_connection); gst_rtmp_server_add_connection (rtmpserver, connection); + gst_rtmp_connection_handshake_async (connection, TRUE, + NULL, gst_rtmp_server_handshake_done, rtmpserver); return TRUE; } +static void +gst_rtmp_server_handshake_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + //GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data); + GstRtmpConnection *connection = GST_RTMP_CONNECTION (source); + GError *error = NULL; + gboolean ret; + + GST_ERROR ("handshake done"); + + ret = gst_rtmp_connection_handshake_finish (connection, result, &error); + if (!ret) { + g_error_free (error); + return; + } +} + void gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver, GstRtmpConnection * connection) diff --git a/rtmp/rtmputils.c b/rtmp/rtmputils.c new file mode 100644 index 0000000..3e4a1b5 --- /dev/null +++ b/rtmp/rtmputils.c @@ -0,0 +1,51 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "rtmputils.h" + + +void +gst_rtmp_dump_data (GBytes * bytes) +{ + const guint8 *data; + gsize size; + int i, j; + + data = g_bytes_get_data (bytes, &size); + for (i = 0; i < size; i += 16) { + g_print ("%04x: ", i); + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%02x ", data[i + j]); + } else { + g_print (" "); + } + } + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.'); + } + } + g_print ("\n"); + } +} diff --git a/rtmp/rtmputils.h b/rtmp/rtmputils.h new file mode 100644 index 0000000..5cc4e6e --- /dev/null +++ b/rtmp/rtmputils.h @@ -0,0 +1,32 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_UTILS_H_ +#define _GST_RTMP_UTILS_H_ + +#include + +G_BEGIN_DECLS + +void gst_rtmp_dump_data (GBytes * bytes); + +G_END_DECLS + +#endif + diff --git a/tools/proxy-server.c b/tools/proxy-server.c index fe8fc18..5abc013 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -25,6 +25,8 @@ #include #include #include "rtmpserver.h" +#include "rtmpclient.h" +#include "rtmputils.h" #define GETTEXT_PACKAGE NULL @@ -34,8 +36,13 @@ add_connection (GstRtmpServer * server, GstRtmpConnection * connection, static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); -static void dump_data (GBytes * bytes); +static void +connect_done (GObject * source, GAsyncResult * result, gpointer user_data); +GstRtmpServer *server; +GstRtmpClient *client; +GCancellable *cancellable; +GstRtmpChunk *proxy_chunk; gboolean verbose; @@ -51,7 +58,6 @@ main (int argc, char *argv[]) GError *error = NULL; GOptionContext *context; GMainLoop *main_loop; - GstRtmpServer *server; context = g_option_context_new ("- FIXME"); g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE); @@ -67,6 +73,10 @@ main (int argc, char *argv[]) NULL); gst_rtmp_server_start (server); + client = gst_rtmp_client_new (); + cancellable = g_cancellable_new (); + + main_loop = g_main_loop_new (NULL, TRUE); g_main_loop_run (main_loop); @@ -77,9 +87,11 @@ static void add_connection (GstRtmpServer * server, GstRtmpConnection * connection, gpointer user_data) { - GST_ERROR ("new connection"); + GST_INFO ("new connection"); g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); + + gst_rtmp_client_connect_async (client, cancellable, connect_done, client); } static void @@ -87,35 +99,43 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data) { GBytes *bytes; + GstRtmpConnection *proxy_conn; - GST_ERROR ("got chunk"); + GST_INFO ("got chunk"); bytes = gst_rtmp_chunk_get_payload (chunk); - dump_data (bytes); + gst_rtmp_dump_data (bytes); + + proxy_conn = gst_rtmp_client_get_connection (client); + + g_object_ref (chunk); + if (proxy_conn) { + gst_rtmp_connection_queue_chunk (proxy_conn, chunk); + } else { + /* save it for after the connection is complete */ + proxy_chunk = chunk; + } } static void -dump_data (GBytes * bytes) +connect_done (GObject * source, GAsyncResult * result, gpointer user_data) { - const guint8 *data; - gsize size; - int i, j; - - data = g_bytes_get_data (bytes, &size); - for (i = 0; i < size; i += 16) { - g_print ("%04x: ", i); - for (j = 0; j < 16; j++) { - if (i + j < size) { - g_print ("%02x ", data[i + j]); - } else { - g_print (" "); - } - } - for (j = 0; j < 16; j++) { - if (i + j < size) { - g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.'); - } - } - g_print ("\n"); + GstRtmpClient *client = user_data; + GError *error = NULL; + gboolean ret; + + GST_INFO ("connect_done"); + + ret = gst_rtmp_client_connect_finish (client, result, &error); + if (!ret) { + GST_ERROR ("error: %s", error->message); + g_error_free (error); + } + if (proxy_chunk) { + GstRtmpConnection *proxy_conn; + + proxy_conn = gst_rtmp_client_get_connection (client); + gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk); + proxy_chunk = NULL; } } -- cgit v1.2.3