summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-23 23:25:03 -0700
committerDavid Schleef <ds@schleef.org>2014-08-23 23:25:03 -0700
commitfde02a939319b2509a4cd2759215deec933232a0 (patch)
treed7921cc32fb482d730677f3013c228fe8457a077
parent6b0cac8398d8a85c3686729cce1f8c1180a90673 (diff)
change GstRtmpServerConnection to GstRtmpConnection
-rw-r--r--rtmp/Makefile.am2
-rw-r--r--rtmp/rtmpconnection.c397
-rw-r--r--rtmp/rtmpconnection.h15
-rw-r--r--rtmp/rtmpserver.c19
-rw-r--r--rtmp/rtmpserver.h10
-rw-r--r--rtmp/rtmpserverconnection.c527
-rw-r--r--rtmp/rtmpserverconnection.h65
-rw-r--r--tools/proxy-server.c8
8 files changed, 430 insertions, 613 deletions
diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am
index 52c061b..18bc37c 100644
--- a/rtmp/Makefile.am
+++ b/rtmp/Makefile.am
@@ -29,7 +29,5 @@ sources = \
rtmpchunk.h \
rtmpserver.c \
rtmpserver.h \
- rtmpserverconnection.c \
- rtmpserverconnection.h \
rtmpstream.c \
rtmpstream.h
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 711877b..86181bd 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -23,6 +23,10 @@
#include <gst/gst.h>
#include "rtmpconnection.h"
+#include "rtmpchunk.h"
+#include "amf.h"
+
+#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
@@ -36,6 +40,8 @@ static void gst_rtmp_connection_get_property (GObject * object,
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
+static void proxy_connect (GstRtmpConnection * sc);
+static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
enum
{
@@ -44,7 +50,8 @@ enum
/* class initialization */
-G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection, G_TYPE_OBJECT,
+G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
+ G_TYPE_OBJECT,
GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
"rtmpconnection", 0, "debug category for GstRtmpConnection class"));
@@ -58,11 +65,16 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
gobject_class->dispose = gst_rtmp_connection_dispose;
gobject_class->finalize = gst_rtmp_connection_finalize;
+ g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass,
+ got_chunk), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
}
static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
+ rtmpconnection->cancellable = g_cancellable_new ();
}
void
@@ -118,3 +130,386 @@ gst_rtmp_connection_finalize (GObject * object)
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
}
+
+GstRtmpConnection *
+gst_rtmp_connection_new (GSocketConnection * connection)
+{
+ GstRtmpConnection *sc;
+
+ sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
+ sc->connection = connection;
+
+ //proxy_connect (sc);
+ //gst_rtmp_connection_read_chunk (sc);
+ gst_rtmp_connection_handshake1 (sc);
+
+ return sc;
+}
+
+typedef struct _ChunkRead ChunkRead;
+struct _ChunkRead
+{
+ gpointer data;
+ gsize alloc_size;
+ gsize size;
+ GstRtmpConnection *connection;
+};
+
+
+static void proxy_connect (GstRtmpConnection * sc);
+static void proxy_connect_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void gst_rtmp_connection_read_chunk (GstRtmpConnection * sc);
+static void gst_rtmp_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk);
+static void proxy_write_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void proxy_read_chunk (GstRtmpConnection * sc);
+static void proxy_read_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void gst_rtmp_connection_write_chunk (GstRtmpConnection *
+ sc, ChunkRead * chunk);
+static void gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_handshake1 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_handshake2 (GstRtmpConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_connection_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_connection_handshake3 (GstRtmpConnection * sc);
+static void gst_rtmp_connection_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+
+
+static void
+proxy_connect (GstRtmpConnection * sc)
+{
+ GSocketConnectable *addr;
+
+ GST_ERROR ("proxy_connect");
+
+ addr = g_network_address_new ("localhost", 1935);
+
+ sc->socket_client = g_socket_client_new ();
+ g_socket_client_connect_async (sc->socket_client, addr,
+ sc->cancellable, proxy_connect_done, sc);
+}
+
+static void
+proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+
+ GST_ERROR ("proxy_connect_done");
+
+ sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client,
+ res, &error);
+ if (sc->proxy_connection == NULL) {
+ GST_ERROR ("connection error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+
+ gst_rtmp_connection_read_chunk (sc);
+}
+
+static void
+gst_rtmp_connection_read_chunk (GstRtmpConnection * sc)
+{
+ GInputStream *is;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_bytes_async (is, 4096,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_read_chunk_done, sc);
+}
+
+G_GNUC_UNUSED static void
+parse_message (guint8 * data, int size)
+{
+ int offset;
+ int bytes_read;
+ GstAmfNode *node;
+
+ offset = 4;
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+}
+
+static void
+gst_rtmp_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ GstRtmpConnection *connection = GST_RTMP_CONNECTION (user_data);
+ GstRtmpChunk *chunk;
+ GError *error = NULL;
+ gsize chunk_size;
+ GBytes *bytes;
+
+ GST_ERROR ("gst_rtmp_connection_read_chunk_done");
+
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size);
+
+ if (chunk) {
+ g_signal_emit_by_name (connection, "got-chunk", chunk);
+
+ g_object_unref (chunk);
+ }
+}
+
+G_GNUC_UNUSED static void
+proxy_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+{
+ GOutputStream *os;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection));
+
+ g_output_stream_write_async (os, chunk->data, chunk->size,
+ G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk);
+}
+
+static void
+proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("proxy_write_done");
+
+ ret = g_output_stream_write_finish (os, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret);
+
+ proxy_read_chunk (chunk->connection);
+
+ g_free (chunk->data);
+ g_free (chunk);
+}
+
+static void
+proxy_read_chunk (GstRtmpConnection * sc)
+{
+ GInputStream *is;
+ ChunkRead *chunk;
+
+ chunk = g_malloc0 (sizeof (ChunkRead));
+ chunk->alloc_size = 4096;
+ chunk->data = g_malloc (chunk->alloc_size);
+ chunk->connection = sc;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection));
+
+ g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
+ G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk);
+}
+
+static void
+proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("proxy_read_done");
+
+ ret = g_input_stream_read_finish (is, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret);
+ chunk->size = ret;
+
+ gst_rtmp_connection_write_chunk (chunk->connection, chunk);
+}
+
+static void
+gst_rtmp_connection_write_chunk (GstRtmpConnection * sc, ChunkRead * chunk)
+{
+ GOutputStream *os;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ g_output_stream_write_async (os, chunk->data, chunk->size,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_write_chunk_done, chunk);
+}
+
+static void
+gst_rtmp_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("gst_rtmp_connection_write_chunk_done");
+
+ ret = g_output_stream_write_finish (os, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
+
+ gst_rtmp_connection_read_chunk (chunk->connection);
+
+ g_free (chunk->data);
+ g_free (chunk);
+}
+
+static void
+gst_rtmp_connection_handshake1 (GstRtmpConnection * sc)
+{
+ GInputStream *is;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_bytes_async (is, 4096,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_handshake1_done, sc);
+}
+
+static void
+gst_rtmp_connection_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ GBytes *bytes;
+
+ GST_ERROR ("gst_rtmp_connection_handshake1_done");
+
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_handshake2 (sc, bytes);
+}
+
+static void
+gst_rtmp_connection_handshake2 (GstRtmpConnection * sc, GBytes * bytes)
+{
+ GOutputStream *os;
+ guint8 *data;
+ GBytes *out_bytes;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ data = g_malloc (1 + 1536 + 1536);
+ memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
+ memset (data + 1537, 0, 8);
+ memset (data + 1537 + 8, 0xef, 1528);
+
+ out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_handshake2_done, sc);
+}
+
+static void
+gst_rtmp_connection_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("gst_rtmp_connection_handshake2_done");
+
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1 + 1536 + 1536) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+
+ gst_rtmp_connection_handshake3 (sc);
+}
+
+static void
+gst_rtmp_connection_handshake3 (GstRtmpConnection * sc)
+{
+ GInputStream *is;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_bytes_async (is, 1536,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_connection_handshake3_done, sc);
+}
+
+static void
+gst_rtmp_connection_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ GBytes *bytes;
+
+ GST_ERROR ("gst_rtmp_connection_handshake3_done");
+
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_connection_read_chunk (sc);
+ (void) &proxy_connect;
+}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index c6a821f..8713147 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -20,6 +20,8 @@
#ifndef _GST_RTMP_CONNECTION_H_
#define _GST_RTMP_CONNECTION_H_
+#include <gio/gio.h>
+#include <rtmp/rtmpchunk.h>
G_BEGIN_DECLS
@@ -36,15 +38,28 @@ struct _GstRtmpConnection
{
GObject object;
+ /* private */
+ GSocketConnection *connection;
+ GSocketConnection *proxy_connection;
+ GCancellable *cancellable;
+ int state;
+ GSocketClient *socket_client;
};
struct _GstRtmpConnectionClass
{
GObjectClass object_class;
+
+ /* signals */
+ void (*got_chunk) (GstRtmpConnection *connection,
+ GstRtmpChunk *chunk);
};
GType gst_rtmp_connection_get_type (void);
+GstRtmpConnection *gst_rtmp_connection_new (
+ GSocketConnection *connection);
+
G_END_DECLS
#endif
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index 4996073..e35bff6 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -64,11 +64,11 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass)
g_signal_new ("add-connection", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass,
add_connection), NULL, NULL, g_cclosure_marshal_generic,
- G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION);
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CONNECTION);
g_signal_new ("remove-connection", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass,
remove_connection), NULL, NULL, g_cclosure_marshal_generic,
- G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION);
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CONNECTION);
}
static void
@@ -173,23 +173,24 @@ gst_rtmp_server_start (GstRtmpServer * rtmpserver)
static gboolean
gst_rtmp_server_incoming (GSocketService * service,
- GSocketConnection * connection, GObject * source_object, gpointer user_data)
+ GSocketConnection * socket_connection, GObject * source_object,
+ gpointer user_data)
{
GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data);
- GstRtmpServerConnection *server_connection;
+ GstRtmpConnection *connection;
GST_ERROR ("client connected");
- g_object_ref (connection);
- server_connection = gst_rtmp_server_connection_new (connection);
- gst_rtmp_server_add_connection (rtmpserver, server_connection);
+ g_object_ref (socket_connection);
+ connection = gst_rtmp_connection_new (socket_connection);
+ gst_rtmp_server_add_connection (rtmpserver, connection);
return TRUE;
}
void
gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver,
- GstRtmpServerConnection * connection)
+ GstRtmpConnection * connection)
{
rtmpserver->connections = g_list_prepend (rtmpserver->connections,
connection);
@@ -198,7 +199,7 @@ gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver,
void
gst_rtmp_server_remove_connection (GstRtmpServer * rtmpserver,
- GstRtmpServerConnection * connection)
+ GstRtmpConnection * connection)
{
rtmpserver->connections = g_list_remove (rtmpserver->connections, connection);
g_signal_emit_by_name (rtmpserver, "remove-connection", connection);
diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h
index 75c71e3..ac5c22e 100644
--- a/rtmp/rtmpserver.h
+++ b/rtmp/rtmpserver.h
@@ -22,7 +22,7 @@
#include <gio/gio.h>
-#include <rtmp/rtmpserverconnection.h>
+#include <rtmp/rtmpconnection.h>
G_BEGIN_DECLS
@@ -54,9 +54,9 @@ struct _GstRtmpServerClass
/* signals */
void (*add_connection) (GstRtmpServer *server,
- GstRtmpServerConnection *connection);
+ GstRtmpConnection *connection);
void (*remove_connection) (GstRtmpServer *server,
- GstRtmpServerConnection *connection);
+ GstRtmpConnection *connection);
};
GType gst_rtmp_server_get_type (void);
@@ -64,9 +64,9 @@ GType gst_rtmp_server_get_type (void);
GstRtmpServer *gst_rtmp_server_new (void);
void gst_rtmp_server_start (GstRtmpServer * rtmpserver);
void gst_rtmp_server_add_connection (GstRtmpServer *rtmpserver,
- GstRtmpServerConnection *connection);
+ GstRtmpConnection *connection);
void gst_rtmp_server_remove_connection (GstRtmpServer *rtmpserver,
- GstRtmpServerConnection *connection);
+ GstRtmpConnection *connection);
G_END_DECLS
diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c
deleted file mode 100644
index ef79d19..0000000
--- a/rtmp/rtmpserverconnection.c
+++ /dev/null
@@ -1,527 +0,0 @@
-/* GStreamer RTMP Library
- * Copyright (C) 2013 David Schleef <ds@schleef.org>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
- * Boston, MA 02110-1335, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gst/gst.h>
-#include "rtmpserverconnection.h"
-#include "rtmpchunk.h"
-#include "amf.h"
-
-#include <string.h>
-
-GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_connection_debug_category);
-#define GST_CAT_DEFAULT gst_rtmp_server_connection_debug_category
-
-/* prototypes */
-
-static void gst_rtmp_server_connection_set_property (GObject * object,
- guint property_id, const GValue * value, GParamSpec * pspec);
-static void gst_rtmp_server_connection_get_property (GObject * object,
- guint property_id, GValue * value, GParamSpec * pspec);
-static void gst_rtmp_server_connection_dispose (GObject * object);
-static void gst_rtmp_server_connection_finalize (GObject * object);
-
-static void proxy_connect (GstRtmpServerConnection * sc);
-static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection *
- sc);
-
-enum
-{
- PROP_0
-};
-
-/* class initialization */
-
-G_DEFINE_TYPE_WITH_CODE (GstRtmpServerConnection, gst_rtmp_server_connection,
- G_TYPE_OBJECT,
- GST_DEBUG_CATEGORY_INIT (gst_rtmp_server_connection_debug_category,
- "rtmpserverconnection", 0,
- "debug category for GstRtmpServerConnection class"));
-
-static void
-gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
-
- gobject_class->set_property = gst_rtmp_server_connection_set_property;
- gobject_class->get_property = gst_rtmp_server_connection_get_property;
- gobject_class->dispose = gst_rtmp_server_connection_dispose;
- gobject_class->finalize = gst_rtmp_server_connection_finalize;
-
- g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerConnectionClass,
- got_chunk), NULL, NULL, g_cclosure_marshal_generic,
- G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
-}
-
-static void
-gst_rtmp_server_connection_init (GstRtmpServerConnection * rtmpserverconnection)
-{
- rtmpserverconnection->cancellable = g_cancellable_new ();
-}
-
-void
-gst_rtmp_server_connection_set_property (GObject * object, guint property_id,
- const GValue * value, GParamSpec * pspec)
-{
- GstRtmpServerConnection *rtmpserverconnection =
- GST_RTMP_SERVER_CONNECTION (object);
-
- GST_DEBUG_OBJECT (rtmpserverconnection, "set_property");
-
- switch (property_id) {
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
- break;
- }
-}
-
-void
-gst_rtmp_server_connection_get_property (GObject * object, guint property_id,
- GValue * value, GParamSpec * pspec)
-{
- GstRtmpServerConnection *rtmpserverconnection =
- GST_RTMP_SERVER_CONNECTION (object);
-
- GST_DEBUG_OBJECT (rtmpserverconnection, "get_property");
-
- switch (property_id) {
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
- break;
- }
-}
-
-void
-gst_rtmp_server_connection_dispose (GObject * object)
-{
- GstRtmpServerConnection *rtmpserverconnection =
- GST_RTMP_SERVER_CONNECTION (object);
-
- GST_DEBUG_OBJECT (rtmpserverconnection, "dispose");
-
- /* clean up as possible. may be called multiple times */
-
- G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->dispose (object);
-}
-
-void
-gst_rtmp_server_connection_finalize (GObject * object)
-{
- GstRtmpServerConnection *rtmpserverconnection =
- GST_RTMP_SERVER_CONNECTION (object);
-
- GST_DEBUG_OBJECT (rtmpserverconnection, "finalize");
-
- /* clean up object here */
-
- G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->finalize (object);
-}
-
-GstRtmpServerConnection *
-gst_rtmp_server_connection_new (GSocketConnection * connection)
-{
- GstRtmpServerConnection *sc;
-
- sc = g_object_new (GST_TYPE_RTMP_SERVER_CONNECTION, NULL);
- sc->connection = connection;
-
- //proxy_connect (sc);
- //gst_rtmp_server_connection_read_chunk (sc);
- gst_rtmp_server_connection_handshake1 (sc);
-
- return sc;
-}
-
-typedef struct _ChunkRead ChunkRead;
-struct _ChunkRead
-{
- gpointer data;
- gsize alloc_size;
- gsize size;
- GstRtmpServerConnection *server_connection;
-};
-
-
-static void proxy_connect (GstRtmpServerConnection * sc);
-static void proxy_connect_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection *
- sc);
-static void gst_rtmp_server_connection_read_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk);
-static void proxy_write_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void proxy_read_chunk (GstRtmpServerConnection * sc);
-static void proxy_read_done (GObject * obj, GAsyncResult * res,
- gpointer user_data);
-static void gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection *
- sc, ChunkRead * chunk);
-static void gst_rtmp_server_connection_write_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection *
- sc);
-static void gst_rtmp_server_connection_handshake1_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc,
- GBytes * bytes);
-static void gst_rtmp_server_connection_handshake2_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-static void gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection *
- sc);
-static void gst_rtmp_server_connection_handshake3_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-
-
-static void
-proxy_connect (GstRtmpServerConnection * sc)
-{
- GSocketConnectable *addr;
-
- GST_ERROR ("proxy_connect");
-
- addr = g_network_address_new ("localhost", 1935);
-
- sc->socket_client = g_socket_client_new ();
- g_socket_client_connect_async (sc->socket_client, addr,
- sc->cancellable, proxy_connect_done, sc);
-}
-
-static void
-proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data)
-{
- GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
- GError *error = NULL;
-
- GST_ERROR ("proxy_connect_done");
-
- sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client,
- res, &error);
- if (sc->proxy_connection == NULL) {
- GST_ERROR ("connection error: %s", error->message);
- g_error_free (error);
- return;
- }
-
- gst_rtmp_server_connection_read_chunk (sc);
-}
-
-static void
-gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc)
-{
- GInputStream *is;
-
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
-
- g_input_stream_read_bytes_async (is, 4096,
- G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_read_chunk_done, sc);
-}
-
-G_GNUC_UNUSED static void
-parse_message (guint8 * data, int size)
-{
- int offset;
- int bytes_read;
- GstAmfNode *node;
-
- offset = 4;
-
- node = gst_amf_node_new_parse ((const char *) (data + offset),
- size - offset, &bytes_read);
- offset += bytes_read;
- g_print ("bytes_read: %d\n", bytes_read);
- if (node)
- gst_amf_node_free (node);
-
- node = gst_amf_node_new_parse ((const char *) (data + offset),
- size - offset, &bytes_read);
- offset += bytes_read;
- g_print ("bytes_read: %d\n", bytes_read);
- if (node)
- gst_amf_node_free (node);
-
- node = gst_amf_node_new_parse ((const char *) (data + offset),
- size - offset, &bytes_read);
- offset += bytes_read;
- g_print ("bytes_read: %d\n", bytes_read);
- if (node)
- gst_amf_node_free (node);
-
-}
-
-static void
-gst_rtmp_server_connection_read_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data)
-{
- GInputStream *is = G_INPUT_STREAM (obj);
- GstRtmpServerConnection *server_connection =
- GST_RTMP_SERVER_CONNECTION (user_data);
- GstRtmpChunk *chunk;
- GError *error = NULL;
- gsize chunk_size;
- GBytes *bytes;
-
- GST_ERROR ("gst_rtmp_server_connection_read_chunk_done");
-
- bytes = g_input_stream_read_bytes_finish (is, res, &error);
- if (bytes == NULL) {
- GST_ERROR ("read error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
-
- chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size);
-
- if (chunk) {
- g_signal_emit_by_name (server_connection, "got-chunk", chunk);
-
- g_object_unref (chunk);
- }
-}
-
-G_GNUC_UNUSED static void
-proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk)
-{
- GOutputStream *os;
-
- os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection));
-
- g_output_stream_write_async (os, chunk->data, chunk->size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk);
-}
-
-static void
-proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data)
-{
- GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
- GError *error = NULL;
- gssize ret;
-
- GST_ERROR ("proxy_write_done");
-
- ret = g_output_stream_write_finish (os, res, &error);
- if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret);
-
- proxy_read_chunk (chunk->server_connection);
-
- g_free (chunk->data);
- g_free (chunk);
-}
-
-static void
-proxy_read_chunk (GstRtmpServerConnection * sc)
-{
- GInputStream *is;
- ChunkRead *chunk;
-
- chunk = g_malloc0 (sizeof (ChunkRead));
- chunk->alloc_size = 4096;
- chunk->data = g_malloc (chunk->alloc_size);
- chunk->server_connection = sc;
-
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection));
-
- g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
- G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk);
-}
-
-static void
-proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data)
-{
- GInputStream *is = G_INPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
- GError *error = NULL;
- gssize ret;
-
- GST_ERROR ("proxy_read_done");
-
- ret = g_input_stream_read_finish (is, res, &error);
- if (ret < 0) {
- GST_ERROR ("read error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret);
- chunk->size = ret;
-
- gst_rtmp_server_connection_write_chunk (chunk->server_connection, chunk);
-}
-
-static void
-gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * sc,
- ChunkRead * chunk)
-{
- GOutputStream *os;
-
- os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
-
- g_output_stream_write_async (os, chunk->data, chunk->size,
- G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_write_chunk_done, chunk);
-}
-
-static void
-gst_rtmp_server_connection_write_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data)
-{
- GOutputStream *os = G_OUTPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
- GError *error = NULL;
- gssize ret;
-
- GST_ERROR ("gst_rtmp_server_connection_write_chunk_done");
-
- ret = g_output_stream_write_finish (os, res, &error);
- if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
-
- gst_rtmp_server_connection_read_chunk (chunk->server_connection);
-
- g_free (chunk->data);
- g_free (chunk);
-}
-
-static void
-gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * sc)
-{
- GInputStream *is;
-
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
-
- g_input_stream_read_bytes_async (is, 4096,
- G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_handshake1_done, sc);
-}
-
-static void
-gst_rtmp_server_connection_handshake1_done (GObject * obj,
- GAsyncResult * res, gpointer user_data)
-{
- GInputStream *is = G_INPUT_STREAM (obj);
- GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
- GError *error = NULL;
- GBytes *bytes;
-
- GST_ERROR ("gst_rtmp_server_connection_handshake1_done");
-
- bytes = g_input_stream_read_bytes_finish (is, res, &error);
- if (bytes == NULL) {
- GST_ERROR ("read error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
-
- gst_rtmp_server_connection_handshake2 (sc, bytes);
-}
-
-static void
-gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc,
- GBytes * bytes)
-{
- GOutputStream *os;
- guint8 *data;
- GBytes *out_bytes;
-
- os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
-
- data = g_malloc (1 + 1536 + 1536);
- memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
- memset (data + 1537, 0, 8);
- memset (data + 1537 + 8, 0xef, 1528);
-
- out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
-
- g_output_stream_write_bytes_async (os, out_bytes,
- G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_handshake2_done, sc);
-}
-
-static void
-gst_rtmp_server_connection_handshake2_done (GObject * obj,
- GAsyncResult * res, gpointer user_data)
-{
- GOutputStream *os = G_OUTPUT_STREAM (obj);
- GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
- GError *error = NULL;
- gssize ret;
-
- GST_ERROR ("gst_rtmp_server_connection_handshake2_done");
-
- ret = g_output_stream_write_bytes_finish (os, res, &error);
- if (ret < 1 + 1536 + 1536) {
- GST_ERROR ("read error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
-
- gst_rtmp_server_connection_handshake3 (sc);
-}
-
-static void
-gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * sc)
-{
- GInputStream *is;
-
- is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
-
- g_input_stream_read_bytes_async (is, 1536,
- G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_handshake3_done, sc);
-}
-
-static void
-gst_rtmp_server_connection_handshake3_done (GObject * obj,
- GAsyncResult * res, gpointer user_data)
-{
- GInputStream *is = G_INPUT_STREAM (obj);
- GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
- GError *error = NULL;
- GBytes *bytes;
-
- GST_ERROR ("gst_rtmp_server_connection_handshake3_done");
-
- bytes = g_input_stream_read_bytes_finish (is, res, &error);
- if (bytes == NULL) {
- GST_ERROR ("read error: %s", error->message);
- g_error_free (error);
- return;
- }
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
-
- gst_rtmp_server_connection_read_chunk (sc);
- (void) &proxy_connect;
-}
diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h
deleted file mode 100644
index 19f06ab..0000000
--- a/rtmp/rtmpserverconnection.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/* GStreamer RTMP Library
- * Copyright (C) 2013 David Schleef <ds@schleef.org>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef _GST_RTMP_SERVER_CONNECTION_H_
-#define _GST_RTMP_SERVER_CONNECTION_H_
-
-#include <gio/gio.h>
-#include <rtmp/rtmpchunk.h>
-
-G_BEGIN_DECLS
-
-#define GST_TYPE_RTMP_SERVER_CONNECTION (gst_rtmp_server_connection_get_type())
-#define GST_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnection))
-#define GST_RTMP_SERVER_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnectionClass))
-#define GST_IS_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_SERVER_CONNECTION))
-#define GST_IS_RTMP_SERVER_CONNECTION_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_SERVER_CONNECTION))
-
-typedef struct _GstRtmpServerConnection GstRtmpServerConnection;
-typedef struct _GstRtmpServerConnectionClass GstRtmpServerConnectionClass;
-
-struct _GstRtmpServerConnection
-{
- GObject object;
-
- /* private */
- GSocketConnection *connection;
- GSocketConnection *proxy_connection;
- GCancellable *cancellable;
- int state;
- GSocketClient *socket_client;
-};
-
-struct _GstRtmpServerConnectionClass
-{
- GObjectClass object_class;
-
- /* signals */
- void (*got_chunk) (GstRtmpServerConnection *connection,
- GstRtmpChunk *chunk);
-};
-
-GType gst_rtmp_server_connection_get_type (void);
-
-GstRtmpServerConnection *gst_rtmp_server_connection_new (
- GSocketConnection *connection);
-
-G_END_DECLS
-
-#endif
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 37629af..fe8fc18 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -29,10 +29,10 @@
#define GETTEXT_PACKAGE NULL
static void
-add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection,
+add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
gpointer user_data);
static void
-got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk,
+got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
static void dump_data (GBytes * bytes);
@@ -74,7 +74,7 @@ main (int argc, char *argv[])
}
static void
-add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection,
+add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
gpointer user_data)
{
GST_ERROR ("new connection");
@@ -83,7 +83,7 @@ add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection,
}
static void
-got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk,
+got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data)
{
GBytes *bytes;