diff options
author | David Schleef <ds@schleef.org> | 2014-08-23 23:25:03 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-23 23:25:03 -0700 |
commit | fde02a939319b2509a4cd2759215deec933232a0 (patch) | |
tree | d7921cc32fb482d730677f3013c228fe8457a077 | |
parent | 6b0cac8398d8a85c3686729cce1f8c1180a90673 (diff) |
change GstRtmpServerConnection to GstRtmpConnection
-rw-r--r-- | rtmp/Makefile.am | 2 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 397 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 15 | ||||
-rw-r--r-- | rtmp/rtmpserver.c | 19 | ||||
-rw-r--r-- | rtmp/rtmpserver.h | 10 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.c | 527 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.h | 65 | ||||
-rw-r--r-- | tools/proxy-server.c | 8 |
8 files changed, 430 insertions, 613 deletions
diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am index 52c061b..18bc37c 100644 --- a/rtmp/Makefile.am +++ b/rtmp/Makefile.am @@ -29,7 +29,5 @@ sources = \ rtmpchunk.h \ rtmpserver.c \ rtmpserver.h \ - rtmpserverconnection.c \ - rtmpserverconnection.h \ rtmpstream.c \ rtmpstream.h diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 711877b..86181bd 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -23,6 +23,10 @@ #include <gst/gst.h> #include "rtmpconnection.h" +#include "rtmpchunk.h" +#include "amf.h" + +#include <string.h> GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category); #define GST_CAT_DEFAULT gst_rtmp_connection_debug_category @@ -36,6 +40,8 @@ static void gst_rtmp_connection_get_property (GObject * object, static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_finalize (GObject * object); +static void proxy_connect (GstRtmpConnection * sc); +static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); enum { @@ -44,7 +50,8 @@ enum /* class initialization */ -G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection, G_TYPE_OBJECT, +G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection, + G_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category, "rtmpconnection", 0, "debug category for GstRtmpConnection class")); @@ -58,11 +65,16 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass) gobject_class->dispose = gst_rtmp_connection_dispose; gobject_class->finalize = gst_rtmp_connection_finalize; + g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, + got_chunk), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); } static void gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { + rtmpconnection->cancellable = g_cancellable_new (); } void @@ -118,3 +130,386 @@ gst_rtmp_connection_finalize (GObject * object) G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object); } + +GstRtmpConnection * +gst_rtmp_connection_new (GSocketConnection * connection) +{ + GstRtmpConnection *sc; + + sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); + sc->connection = connection; + + //proxy_connect (sc); + //gst_rtmp_connection_read_chunk (sc); + gst_rtmp_connection_handshake1 (sc); + + return sc; +} + +typedef struct _ChunkRead ChunkRead; +struct _ChunkRead +{ + gpointer data; + gsize alloc_size; + gsize size; + GstRtmpConnection *connection; +}; + + +static void proxy_connect (GstRtmpConnection * sc); +static void proxy_connect_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc); +static void gst_rtmp_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk); +static void proxy_write_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void proxy_read_chunk (GstRtmpConnection * sc); +static void proxy_read_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void gst_rtmp_connection_write_chunk (GstRtmpConnection * + sc, ChunkRead * chunk); +static void gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc); +static void gst_rtmp_connection_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, + GBytes * bytes); +static void gst_rtmp_connection_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc); +static void gst_rtmp_connection_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); + + +static void +proxy_connect (GstRtmpConnection * sc) +{ + GSocketConnectable *addr; + + GST_ERROR ("proxy_connect"); + + addr = g_network_address_new ("localhost", 1935); + + sc->socket_client = g_socket_client_new (); + g_socket_client_connect_async (sc->socket_client, addr, + sc->cancellable, proxy_connect_done, sc); +} + +static void +proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + + GST_ERROR ("proxy_connect_done"); + + sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client, + res, &error); + if (sc->proxy_connection == NULL) { + GST_ERROR ("connection error: %s", error->message); + g_error_free (error); + return; + } + + gst_rtmp_connection_read_chunk (sc); +} + +static void +gst_rtmp_connection_read_chunk (GstRtmpConnection * sc) +{ + GInputStream *is; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_bytes_async (is, 4096, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_read_chunk_done, sc); +} + +G_GNUC_UNUSED static void +parse_message (guint8 * data, int size) +{ + int offset; + int bytes_read; + GstAmfNode *node; + + offset = 4; + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + +} + +static void +gst_rtmp_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + GstRtmpConnection *connection = GST_RTMP_CONNECTION (user_data); + GstRtmpChunk *chunk; + GError *error = NULL; + gsize chunk_size; + GBytes *bytes; + + GST_ERROR ("gst_rtmp_connection_read_chunk_done"); + + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size); + + if (chunk) { + g_signal_emit_by_name (connection, "got-chunk", chunk); + + g_object_unref (chunk); + } +} + +G_GNUC_UNUSED static void +proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +{ + GOutputStream *os; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection)); + + g_output_stream_write_async (os, chunk->data, chunk->size, + G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk); +} + +static void +proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("proxy_write_done"); + + ret = g_output_stream_write_finish (os, res, &error); + if (ret < 0) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret); + + proxy_read_chunk (chunk->connection); + + g_free (chunk->data); + g_free (chunk); +} + +static void +proxy_read_chunk (GstRtmpConnection * sc) +{ + GInputStream *is; + ChunkRead *chunk; + + chunk = g_malloc0 (sizeof (ChunkRead)); + chunk->alloc_size = 4096; + chunk->data = g_malloc (chunk->alloc_size); + chunk->connection = sc; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection)); + + g_input_stream_read_async (is, chunk->data, chunk->alloc_size, + G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk); +} + +static void +proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("proxy_read_done"); + + ret = g_input_stream_read_finish (is, res, &error); + if (ret < 0) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret); + chunk->size = ret; + + gst_rtmp_connection_write_chunk (chunk->connection, chunk); +} + +static void +gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk) +{ + GOutputStream *os; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + g_output_stream_write_async (os, chunk->data, chunk->size, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_write_chunk_done, chunk); +} + +static void +gst_rtmp_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("gst_rtmp_connection_write_chunk_done"); + + ret = g_output_stream_write_finish (os, res, &error); + if (ret < 0) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); + + gst_rtmp_connection_read_chunk (chunk->connection); + + g_free (chunk->data); + g_free (chunk); +} + +static void +gst_rtmp_connection_handshake1 (GstRtmpConnection * sc) +{ + GInputStream *is; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_bytes_async (is, 4096, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_handshake1_done, sc); +} + +static void +gst_rtmp_connection_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + GBytes *bytes; + + GST_ERROR ("gst_rtmp_connection_handshake1_done"); + + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_handshake2 (sc, bytes); +} + +static void +gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes) +{ + GOutputStream *os; + guint8 *data; + GBytes *out_bytes; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + data = g_malloc (1 + 1536 + 1536); + memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); + memset (data + 1537, 0, 8); + memset (data + 1537 + 8, 0xef, 1528); + + out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + + g_output_stream_write_bytes_async (os, out_bytes, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_handshake2_done, sc); +} + +static void +gst_rtmp_connection_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + gssize ret; + + GST_ERROR ("gst_rtmp_connection_handshake2_done"); + + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1 + 1536 + 1536) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + + gst_rtmp_connection_handshake3 (sc); +} + +static void +gst_rtmp_connection_handshake3 (GstRtmpConnection * sc) +{ + GInputStream *is; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_bytes_async (is, 1536, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_connection_handshake3_done, sc); +} + +static void +gst_rtmp_connection_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + GBytes *bytes; + + GST_ERROR ("gst_rtmp_connection_handshake3_done"); + + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_connection_read_chunk (sc); + (void) &proxy_connect; +} diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index c6a821f..8713147 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -20,6 +20,8 @@ #ifndef _GST_RTMP_CONNECTION_H_ #define _GST_RTMP_CONNECTION_H_ +#include <gio/gio.h> +#include <rtmp/rtmpchunk.h> G_BEGIN_DECLS @@ -36,15 +38,28 @@ struct _GstRtmpConnection { GObject object; + /* private */ + GSocketConnection *connection; + GSocketConnection *proxy_connection; + GCancellable *cancellable; + int state; + GSocketClient *socket_client; }; struct _GstRtmpConnectionClass { GObjectClass object_class; + + /* signals */ + void (*got_chunk) (GstRtmpConnection *connection, + GstRtmpChunk *chunk); }; GType gst_rtmp_connection_get_type (void); +GstRtmpConnection *gst_rtmp_connection_new ( + GSocketConnection *connection); + G_END_DECLS #endif diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index 4996073..e35bff6 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -64,11 +64,11 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass) g_signal_new ("add-connection", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass, add_connection), NULL, NULL, g_cclosure_marshal_generic, - G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION); + G_TYPE_NONE, 1, GST_TYPE_RTMP_CONNECTION); g_signal_new ("remove-connection", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass, remove_connection), NULL, NULL, g_cclosure_marshal_generic, - G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION); + G_TYPE_NONE, 1, GST_TYPE_RTMP_CONNECTION); } static void @@ -173,23 +173,24 @@ gst_rtmp_server_start (GstRtmpServer * rtmpserver) static gboolean gst_rtmp_server_incoming (GSocketService * service, - GSocketConnection * connection, GObject * source_object, gpointer user_data) + GSocketConnection * socket_connection, GObject * source_object, + gpointer user_data) { GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data); - GstRtmpServerConnection *server_connection; + GstRtmpConnection *connection; GST_ERROR ("client connected"); - g_object_ref (connection); - server_connection = gst_rtmp_server_connection_new (connection); - gst_rtmp_server_add_connection (rtmpserver, server_connection); + g_object_ref (socket_connection); + connection = gst_rtmp_connection_new (socket_connection); + gst_rtmp_server_add_connection (rtmpserver, connection); return TRUE; } void gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver, - GstRtmpServerConnection * connection) + GstRtmpConnection * connection) { rtmpserver->connections = g_list_prepend (rtmpserver->connections, connection); @@ -198,7 +199,7 @@ gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver, void gst_rtmp_server_remove_connection (GstRtmpServer * rtmpserver, - GstRtmpServerConnection * connection) + GstRtmpConnection * connection) { rtmpserver->connections = g_list_remove (rtmpserver->connections, connection); g_signal_emit_by_name (rtmpserver, "remove-connection", connection); diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h index 75c71e3..ac5c22e 100644 --- a/rtmp/rtmpserver.h +++ b/rtmp/rtmpserver.h @@ -22,7 +22,7 @@ #include <gio/gio.h> -#include <rtmp/rtmpserverconnection.h> +#include <rtmp/rtmpconnection.h> G_BEGIN_DECLS @@ -54,9 +54,9 @@ struct _GstRtmpServerClass /* signals */ void (*add_connection) (GstRtmpServer *server, - GstRtmpServerConnection *connection); + GstRtmpConnection *connection); void (*remove_connection) (GstRtmpServer *server, - GstRtmpServerConnection *connection); + GstRtmpConnection *connection); }; GType gst_rtmp_server_get_type (void); @@ -64,9 +64,9 @@ GType gst_rtmp_server_get_type (void); GstRtmpServer *gst_rtmp_server_new (void); void gst_rtmp_server_start (GstRtmpServer * rtmpserver); void gst_rtmp_server_add_connection (GstRtmpServer *rtmpserver, - GstRtmpServerConnection *connection); + GstRtmpConnection *connection); void gst_rtmp_server_remove_connection (GstRtmpServer *rtmpserver, - GstRtmpServerConnection *connection); + GstRtmpConnection *connection); G_END_DECLS diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c deleted file mode 100644 index ef79d19..0000000 --- a/rtmp/rtmpserverconnection.c +++ /dev/null @@ -1,527 +0,0 @@ -/* GStreamer RTMP Library - * Copyright (C) 2013 David Schleef <ds@schleef.org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, - * Boston, MA 02110-1335, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gst/gst.h> -#include "rtmpserverconnection.h" -#include "rtmpchunk.h" -#include "amf.h" - -#include <string.h> - -GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_connection_debug_category); -#define GST_CAT_DEFAULT gst_rtmp_server_connection_debug_category - -/* prototypes */ - -static void gst_rtmp_server_connection_set_property (GObject * object, - guint property_id, const GValue * value, GParamSpec * pspec); -static void gst_rtmp_server_connection_get_property (GObject * object, - guint property_id, GValue * value, GParamSpec * pspec); -static void gst_rtmp_server_connection_dispose (GObject * object); -static void gst_rtmp_server_connection_finalize (GObject * object); - -static void proxy_connect (GstRtmpServerConnection * sc); -static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * - sc); - -enum -{ - PROP_0 -}; - -/* class initialization */ - -G_DEFINE_TYPE_WITH_CODE (GstRtmpServerConnection, gst_rtmp_server_connection, - G_TYPE_OBJECT, - GST_DEBUG_CATEGORY_INIT (gst_rtmp_server_connection_debug_category, - "rtmpserverconnection", 0, - "debug category for GstRtmpServerConnection class")); - -static void -gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - gobject_class->set_property = gst_rtmp_server_connection_set_property; - gobject_class->get_property = gst_rtmp_server_connection_get_property; - gobject_class->dispose = gst_rtmp_server_connection_dispose; - gobject_class->finalize = gst_rtmp_server_connection_finalize; - - g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerConnectionClass, - got_chunk), NULL, NULL, g_cclosure_marshal_generic, - G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); -} - -static void -gst_rtmp_server_connection_init (GstRtmpServerConnection * rtmpserverconnection) -{ - rtmpserverconnection->cancellable = g_cancellable_new (); -} - -void -gst_rtmp_server_connection_set_property (GObject * object, guint property_id, - const GValue * value, GParamSpec * pspec) -{ - GstRtmpServerConnection *rtmpserverconnection = - GST_RTMP_SERVER_CONNECTION (object); - - GST_DEBUG_OBJECT (rtmpserverconnection, "set_property"); - - switch (property_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); - break; - } -} - -void -gst_rtmp_server_connection_get_property (GObject * object, guint property_id, - GValue * value, GParamSpec * pspec) -{ - GstRtmpServerConnection *rtmpserverconnection = - GST_RTMP_SERVER_CONNECTION (object); - - GST_DEBUG_OBJECT (rtmpserverconnection, "get_property"); - - switch (property_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); - break; - } -} - -void -gst_rtmp_server_connection_dispose (GObject * object) -{ - GstRtmpServerConnection *rtmpserverconnection = - GST_RTMP_SERVER_CONNECTION (object); - - GST_DEBUG_OBJECT (rtmpserverconnection, "dispose"); - - /* clean up as possible. may be called multiple times */ - - G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->dispose (object); -} - -void -gst_rtmp_server_connection_finalize (GObject * object) -{ - GstRtmpServerConnection *rtmpserverconnection = - GST_RTMP_SERVER_CONNECTION (object); - - GST_DEBUG_OBJECT (rtmpserverconnection, "finalize"); - - /* clean up object here */ - - G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->finalize (object); -} - -GstRtmpServerConnection * -gst_rtmp_server_connection_new (GSocketConnection * connection) -{ - GstRtmpServerConnection *sc; - - sc = g_object_new (GST_TYPE_RTMP_SERVER_CONNECTION, NULL); - sc->connection = connection; - - //proxy_connect (sc); - //gst_rtmp_server_connection_read_chunk (sc); - gst_rtmp_server_connection_handshake1 (sc); - - return sc; -} - -typedef struct _ChunkRead ChunkRead; -struct _ChunkRead -{ - gpointer data; - gsize alloc_size; - gsize size; - GstRtmpServerConnection *server_connection; -}; - - -static void proxy_connect (GstRtmpServerConnection * sc); -static void proxy_connect_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * - sc); -static void gst_rtmp_server_connection_read_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk); -static void proxy_write_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void proxy_read_chunk (GstRtmpServerConnection * sc); -static void proxy_read_done (GObject * obj, GAsyncResult * res, - gpointer user_data); -static void gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * - sc, ChunkRead * chunk); -static void gst_rtmp_server_connection_write_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * - sc); -static void gst_rtmp_server_connection_handshake1_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc, - GBytes * bytes); -static void gst_rtmp_server_connection_handshake2_done (GObject * obj, - GAsyncResult * res, gpointer user_data); -static void gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * - sc); -static void gst_rtmp_server_connection_handshake3_done (GObject * obj, - GAsyncResult * res, gpointer user_data); - - -static void -proxy_connect (GstRtmpServerConnection * sc) -{ - GSocketConnectable *addr; - - GST_ERROR ("proxy_connect"); - - addr = g_network_address_new ("localhost", 1935); - - sc->socket_client = g_socket_client_new (); - g_socket_client_connect_async (sc->socket_client, addr, - sc->cancellable, proxy_connect_done, sc); -} - -static void -proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data) -{ - GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); - GError *error = NULL; - - GST_ERROR ("proxy_connect_done"); - - sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client, - res, &error); - if (sc->proxy_connection == NULL) { - GST_ERROR ("connection error: %s", error->message); - g_error_free (error); - return; - } - - gst_rtmp_server_connection_read_chunk (sc); -} - -static void -gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc) -{ - GInputStream *is; - - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - - g_input_stream_read_bytes_async (is, 4096, - G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_read_chunk_done, sc); -} - -G_GNUC_UNUSED static void -parse_message (guint8 * data, int size) -{ - int offset; - int bytes_read; - GstAmfNode *node; - - offset = 4; - - node = gst_amf_node_new_parse ((const char *) (data + offset), - size - offset, &bytes_read); - offset += bytes_read; - g_print ("bytes_read: %d\n", bytes_read); - if (node) - gst_amf_node_free (node); - - node = gst_amf_node_new_parse ((const char *) (data + offset), - size - offset, &bytes_read); - offset += bytes_read; - g_print ("bytes_read: %d\n", bytes_read); - if (node) - gst_amf_node_free (node); - - node = gst_amf_node_new_parse ((const char *) (data + offset), - size - offset, &bytes_read); - offset += bytes_read; - g_print ("bytes_read: %d\n", bytes_read); - if (node) - gst_amf_node_free (node); - -} - -static void -gst_rtmp_server_connection_read_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data) -{ - GInputStream *is = G_INPUT_STREAM (obj); - GstRtmpServerConnection *server_connection = - GST_RTMP_SERVER_CONNECTION (user_data); - GstRtmpChunk *chunk; - GError *error = NULL; - gsize chunk_size; - GBytes *bytes; - - GST_ERROR ("gst_rtmp_server_connection_read_chunk_done"); - - bytes = g_input_stream_read_bytes_finish (is, res, &error); - if (bytes == NULL) { - GST_ERROR ("read error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); - - chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size); - - if (chunk) { - g_signal_emit_by_name (server_connection, "got-chunk", chunk); - - g_object_unref (chunk); - } -} - -G_GNUC_UNUSED static void -proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk) -{ - GOutputStream *os; - - os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection)); - - g_output_stream_write_async (os, chunk->data, chunk->size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk); -} - -static void -proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data) -{ - GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; - GError *error = NULL; - gssize ret; - - GST_ERROR ("proxy_write_done"); - - ret = g_output_stream_write_finish (os, res, &error); - if (ret < 0) { - GST_ERROR ("write error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret); - - proxy_read_chunk (chunk->server_connection); - - g_free (chunk->data); - g_free (chunk); -} - -static void -proxy_read_chunk (GstRtmpServerConnection * sc) -{ - GInputStream *is; - ChunkRead *chunk; - - chunk = g_malloc0 (sizeof (ChunkRead)); - chunk->alloc_size = 4096; - chunk->data = g_malloc (chunk->alloc_size); - chunk->server_connection = sc; - - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection)); - - g_input_stream_read_async (is, chunk->data, chunk->alloc_size, - G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk); -} - -static void -proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data) -{ - GInputStream *is = G_INPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; - GError *error = NULL; - gssize ret; - - GST_ERROR ("proxy_read_done"); - - ret = g_input_stream_read_finish (is, res, &error); - if (ret < 0) { - GST_ERROR ("read error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret); - chunk->size = ret; - - gst_rtmp_server_connection_write_chunk (chunk->server_connection, chunk); -} - -static void -gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * sc, - ChunkRead * chunk) -{ - GOutputStream *os; - - os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - - g_output_stream_write_async (os, chunk->data, chunk->size, - G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_write_chunk_done, chunk); -} - -static void -gst_rtmp_server_connection_write_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data) -{ - GOutputStream *os = G_OUTPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; - GError *error = NULL; - gssize ret; - - GST_ERROR ("gst_rtmp_server_connection_write_chunk_done"); - - ret = g_output_stream_write_finish (os, res, &error); - if (ret < 0) { - GST_ERROR ("write error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); - - gst_rtmp_server_connection_read_chunk (chunk->server_connection); - - g_free (chunk->data); - g_free (chunk); -} - -static void -gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * sc) -{ - GInputStream *is; - - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - - g_input_stream_read_bytes_async (is, 4096, - G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_handshake1_done, sc); -} - -static void -gst_rtmp_server_connection_handshake1_done (GObject * obj, - GAsyncResult * res, gpointer user_data) -{ - GInputStream *is = G_INPUT_STREAM (obj); - GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); - GError *error = NULL; - GBytes *bytes; - - GST_ERROR ("gst_rtmp_server_connection_handshake1_done"); - - bytes = g_input_stream_read_bytes_finish (is, res, &error); - if (bytes == NULL) { - GST_ERROR ("read error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); - - gst_rtmp_server_connection_handshake2 (sc, bytes); -} - -static void -gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc, - GBytes * bytes) -{ - GOutputStream *os; - guint8 *data; - GBytes *out_bytes; - - os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - - data = g_malloc (1 + 1536 + 1536); - memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); - memset (data + 1537, 0, 8); - memset (data + 1537 + 8, 0xef, 1528); - - out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); - - g_output_stream_write_bytes_async (os, out_bytes, - G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_handshake2_done, sc); -} - -static void -gst_rtmp_server_connection_handshake2_done (GObject * obj, - GAsyncResult * res, gpointer user_data) -{ - GOutputStream *os = G_OUTPUT_STREAM (obj); - GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); - GError *error = NULL; - gssize ret; - - GST_ERROR ("gst_rtmp_server_connection_handshake2_done"); - - ret = g_output_stream_write_bytes_finish (os, res, &error); - if (ret < 1 + 1536 + 1536) { - GST_ERROR ("read error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret); - - gst_rtmp_server_connection_handshake3 (sc); -} - -static void -gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * sc) -{ - GInputStream *is; - - is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - - g_input_stream_read_bytes_async (is, 1536, - G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_handshake3_done, sc); -} - -static void -gst_rtmp_server_connection_handshake3_done (GObject * obj, - GAsyncResult * res, gpointer user_data) -{ - GInputStream *is = G_INPUT_STREAM (obj); - GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); - GError *error = NULL; - GBytes *bytes; - - GST_ERROR ("gst_rtmp_server_connection_handshake3_done"); - - bytes = g_input_stream_read_bytes_finish (is, res, &error); - if (bytes == NULL) { - GST_ERROR ("read error: %s", error->message); - g_error_free (error); - return; - } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); - - gst_rtmp_server_connection_read_chunk (sc); - (void) &proxy_connect; -} diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h deleted file mode 100644 index 19f06ab..0000000 --- a/rtmp/rtmpserverconnection.h +++ /dev/null @@ -1,65 +0,0 @@ -/* GStreamer RTMP Library - * Copyright (C) 2013 David Schleef <ds@schleef.org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#ifndef _GST_RTMP_SERVER_CONNECTION_H_ -#define _GST_RTMP_SERVER_CONNECTION_H_ - -#include <gio/gio.h> -#include <rtmp/rtmpchunk.h> - -G_BEGIN_DECLS - -#define GST_TYPE_RTMP_SERVER_CONNECTION (gst_rtmp_server_connection_get_type()) -#define GST_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnection)) -#define GST_RTMP_SERVER_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnectionClass)) -#define GST_IS_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_SERVER_CONNECTION)) -#define GST_IS_RTMP_SERVER_CONNECTION_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_SERVER_CONNECTION)) - -typedef struct _GstRtmpServerConnection GstRtmpServerConnection; -typedef struct _GstRtmpServerConnectionClass GstRtmpServerConnectionClass; - -struct _GstRtmpServerConnection -{ - GObject object; - - /* private */ - GSocketConnection *connection; - GSocketConnection *proxy_connection; - GCancellable *cancellable; - int state; - GSocketClient *socket_client; -}; - -struct _GstRtmpServerConnectionClass -{ - GObjectClass object_class; - - /* signals */ - void (*got_chunk) (GstRtmpServerConnection *connection, - GstRtmpChunk *chunk); -}; - -GType gst_rtmp_server_connection_get_type (void); - -GstRtmpServerConnection *gst_rtmp_server_connection_new ( - GSocketConnection *connection); - -G_END_DECLS - -#endif diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 37629af..fe8fc18 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -29,10 +29,10 @@ #define GETTEXT_PACKAGE NULL static void -add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection, +add_connection (GstRtmpServer * server, GstRtmpConnection * connection, gpointer user_data); static void -got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk, +got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); static void dump_data (GBytes * bytes); @@ -74,7 +74,7 @@ main (int argc, char *argv[]) } static void -add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection, +add_connection (GstRtmpServer * server, GstRtmpConnection * connection, gpointer user_data) { GST_ERROR ("new connection"); @@ -83,7 +83,7 @@ add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection, } static void -got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk, +got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data) { GBytes *bytes; |