summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-23 11:21:25 -0700
committerDavid Schleef <ds@schleef.org>2014-08-23 11:21:25 -0700
commit45301abe260ed6c517cfc31ff0d6f23a0f1d36d6 (patch)
treefa859bbb20c3419a2c41a1e619857aeb9aa204de
parentfbdab241c6628d3498d587abb9af2e897f4a6855 (diff)
hacking
-rw-r--r--configure.ac2
-rw-r--r--rtmp/Makefile.am17
-rw-r--r--rtmp/amf.c267
-rw-r--r--rtmp/amf.h72
-rw-r--r--rtmp/rtmpchunk.c121
-rw-r--r--rtmp/rtmpchunk.h (renamed from rtmp/rtmppacket.h)35
-rw-r--r--rtmp/rtmpclient.c279
-rw-r--r--rtmp/rtmpclient.h47
-rw-r--r--rtmp/rtmpmessage.c121
-rw-r--r--rtmp/rtmpmessage.h55
-rw-r--r--rtmp/rtmpserver.c55
-rw-r--r--rtmp/rtmpserver.h10
-rw-r--r--rtmp/rtmpserverconnection.c593
-rw-r--r--rtmp/rtmpserverconnection.h60
-rw-r--r--rtmp/rtmpstream.c (renamed from rtmp/rtmppacket.c)60
-rw-r--r--rtmp/rtmpstream.h52
-rw-r--r--tools/Makefile.am12
-rw-r--r--tools/client-test.c88
-rw-r--r--tools/proxy-server.c64
19 files changed, 1950 insertions, 60 deletions
diff --git a/configure.ac b/configure.ac
index 7de7917..b09f07b 100644
--- a/configure.ac
+++ b/configure.ac
@@ -105,7 +105,7 @@ if test "$HAVE_GLIB" != yes ; then
exit 1
fi
-PKG_CHECK_MODULES(GST, gstreamer-$GST_API_VERSION gstreamer-base-$GST_API_VERSION gstreamer-video-$GST_API_VERSION gstreamer-audio-$GST_API_VERSION,
+PKG_CHECK_MODULES(GST, gstreamer-$GST_API_VERSION gstreamer-base-$GST_API_VERSION gstreamer-video-$GST_API_VERSION gstreamer-audio-$GST_API_VERSION gio-2.0,
HAVE_GST=yes, HAVE_GST=no)
if test "$HAVE_GST" != yes ; then
echo GStreamer and gst-plugns-base are needed to build
diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am
index 43cd961..52c061b 100644
--- a/rtmp/Makefile.am
+++ b/rtmp/Makefile.am
@@ -4,6 +4,7 @@ lib_LTLIBRARIES = libgstrtmp-@GST_API_VERSION@.la
gstrtmp_@GST_API_VERSION@_includedir = $(includedir)/gstreamer-@GST_API_VERSION@/rtmp
libgstrtmp_@GST_API_VERSION@_la_CFLAGS = \
+ $(GST_RTMP_CFLAGS) \
$(GST_CFLAGS) \
$(SOUP_CFLAGS)
libgstrtmp_@GST_API_VERSION@_la_LIBADD = \
@@ -16,11 +17,19 @@ libgstrtmp_@GST_API_VERSION@_la_SOURCES = \
$(sources)
sources = \
- rtmpserver.c \
- rtmpserver.h \
+ amf.c \
+ amf.h \
rtmpclient.c \
rtmpclient.h \
rtmpconnection.c \
rtmpconnection.h \
- rtmppacket.c \
- rtmppacket.h
+ rtmpmessage.c \
+ rtmpmessage.h \
+ rtmpchunk.c \
+ rtmpchunk.h \
+ rtmpserver.c \
+ rtmpserver.h \
+ rtmpserverconnection.c \
+ rtmpserverconnection.h \
+ rtmpstream.c \
+ rtmpstream.h
diff --git a/rtmp/amf.c b/rtmp/amf.c
new file mode 100644
index 0000000..153f663
--- /dev/null
+++ b/rtmp/amf.c
@@ -0,0 +1,267 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+
+#include "amf.h"
+
+typedef struct _AmfObjectField AmfObjectField;
+struct _AmfObjectField
+{
+ char *name;
+ GstAmfNode *value;
+};
+
+typedef struct _AmfParser AmfParser;
+struct _AmfParser
+{
+ const char *data;
+ gsize size;
+ int offset;
+ gboolean error;
+};
+
+static char *_parse_utf8_string (AmfParser * parser);
+static void _parse_object (AmfParser * parser, GstAmfNode * node);
+static GstAmfNode *_parse_value (AmfParser * parser);
+static void amf_object_field_free (AmfObjectField * field);
+
+
+GstAmfNode *
+gst_amf_node_new (GstAmfType type)
+{
+ GstAmfNode *node;
+
+ node = g_malloc0 (sizeof (GstAmfNode));
+ node->type = type;
+ if (node->type == GST_AMF_TYPE_OBJECT) {
+ node->array_val = g_ptr_array_new ();
+ }
+
+ return node;
+}
+
+void
+gst_amf_node_free (GstAmfNode * node)
+{
+ if (node->type == GST_AMF_TYPE_STRING) {
+ g_free (node->string_val);
+ } else if (node->type == GST_AMF_TYPE_OBJECT) {
+ g_ptr_array_foreach (node->array_val, (GFunc) amf_object_field_free, NULL);
+ g_ptr_array_free (node->array_val, TRUE);
+ }
+
+ g_free (node);
+}
+
+static int
+_parse_u8 (AmfParser * parser)
+{
+ int x;
+ x = parser->data[parser->offset];
+ parser->offset++;
+ return x;
+}
+
+static int
+_parse_u16 (AmfParser * parser)
+{
+ int x;
+ x = (parser->data[parser->offset] << 8) | parser->data[parser->offset + 1];
+ parser->offset += 2;
+ return x;
+}
+
+#if 0
+static int
+_parse_u24 (AmfParser * parser)
+{
+ int x;
+ x = (parser->data[parser->offset] << 16) |
+ (parser->data[parser->offset + 1] << 8) | parser->data[parser->offset +
+ 2];
+ parser->offset += 3;
+ return x;
+}
+
+static int
+_parse_u32 (AmfParser * parser)
+{
+ int x;
+ x = (parser->data[parser->offset] << 24) |
+ (parser->data[parser->offset + 1] << 16) |
+ (parser->data[parser->offset + 2] << 8) | parser->data[parser->offset +
+ 3];
+ parser->offset += 4;
+ return x;
+}
+#endif
+
+static int
+_parse_double (AmfParser * parser)
+{
+ double d;
+ int i;
+ guint8 *d_ptr = (guint8 *) & d;
+ for (i = 0; i < 8; i++) {
+ d_ptr[i] = parser->data[parser->offset + (7 - i)];
+ }
+ parser->offset += 8;
+ return d;
+}
+
+static char *
+_parse_utf8_string (AmfParser * parser)
+{
+ int size;
+ char *s;
+
+ size = _parse_u16 (parser);
+ if (parser->offset + size >= parser->size) {
+ parser->error = TRUE;
+ return NULL;
+ }
+ s = g_strndup (parser->data + parser->offset, size);
+ parser->offset += size;
+
+ return s;
+}
+
+static void
+_parse_object (AmfParser * parser, GstAmfNode * node)
+{
+ while (1) {
+ char *s;
+ GstAmfNode *child_node;
+ s = _parse_utf8_string (parser);
+ child_node = _parse_value (parser);
+ if (child_node->type == GST_AMF_TYPE_OBJECT_END) {
+ gst_amf_node_free (child_node);
+ break;
+ }
+ gst_amf_object_append_take (node, s, child_node);
+ }
+}
+
+static GstAmfNode *
+_parse_value (AmfParser * parser)
+{
+ GstAmfNode *node = NULL;
+ GstAmfType type;
+
+ type = _parse_u8 (parser);
+ node = gst_amf_node_new (type);
+
+ GST_DEBUG ("parsing type %d", type);
+
+ switch (type) {
+ case GST_AMF_TYPE_NUMBER:
+ gst_amf_node_set_double (node, _parse_double (parser));
+ break;
+ case GST_AMF_TYPE_BOOLEAN:
+ gst_amf_node_set_boolean (node, _parse_u8 (parser));
+ break;
+ case GST_AMF_TYPE_STRING:
+ gst_amf_node_set_string_take (node, _parse_utf8_string (parser));
+ break;
+ case GST_AMF_TYPE_OBJECT:
+ _parse_object (parser, node);
+ break;
+ case GST_AMF_TYPE_MOVIECLIP:
+ GST_ERROR ("unimplemented");
+ break;
+ case GST_AMF_TYPE_NULL:
+ break;
+ case GST_AMF_TYPE_OBJECT_END:
+ break;
+ default:
+ GST_ERROR ("unimplemented");
+ break;
+ }
+
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_parse (const char *data, int size, int *n_bytes)
+{
+ AmfParser _p = { 0 }, *parser = &_p;
+ GstAmfNode *node;
+
+ parser->data = data;
+ parser->size = size;
+ node = _parse_value (parser);
+
+ if (n_bytes)
+ *n_bytes = parser->offset;
+ return node;
+}
+
+void
+gst_amf_node_set_boolean (GstAmfNode * node, gboolean val)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_BOOLEAN);
+ node->int_val = val;
+}
+
+void
+gst_amf_node_set_double (GstAmfNode * node, double val)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER);
+ node->double_val = val;
+}
+
+void
+gst_amf_node_set_string (GstAmfNode * node, const char *s)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_STRING);
+ node->string_val = g_strdup (s);
+}
+
+void
+gst_amf_node_set_string_take (GstAmfNode * node, char *s)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_STRING);
+ node->string_val = s;
+}
+
+void
+gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node)
+{
+ AmfObjectField *field;
+
+ g_return_if_fail (node->type == GST_AMF_TYPE_OBJECT);
+
+ field = g_malloc0 (sizeof (AmfObjectField));
+ field->name = s;
+ field->value = child_node;
+ g_ptr_array_add (node->array_val, field);
+}
+
+static void
+amf_object_field_free (AmfObjectField * field)
+{
+ g_free (field->name);
+ gst_amf_node_free (field->value);
+ g_free (field);
+}
diff --git a/rtmp/amf.h b/rtmp/amf.h
new file mode 100644
index 0000000..00b092a
--- /dev/null
+++ b/rtmp/amf.h
@@ -0,0 +1,72 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_AMF_H_
+#define _GST_RTMP_AMF_H_
+
+#include <glib.h>
+
+G_BEGIN_DECLS
+
+typedef enum {
+ GST_AMF_TYPE_NUMBER = 0,
+ GST_AMF_TYPE_BOOLEAN = 1,
+ GST_AMF_TYPE_STRING = 2,
+ GST_AMF_TYPE_OBJECT = 3,
+ GST_AMF_TYPE_MOVIECLIP = 4,
+ GST_AMF_TYPE_NULL = 5,
+ GST_AMF_TYPE_UNDEFINED = 6,
+ GST_AMF_TYPE_REFERENCE = 7,
+ GST_AMF_TYPE_ECMA_ARRAY = 8,
+ GST_AMF_TYPE_OBJECT_END = 9,
+ GST_AMF_TYPE_STRICT_ARRAY = 10,
+ GST_AMF_TYPE_DATE = 11,
+ GST_AMF_TYPE_LONG_STRING = 12,
+ GST_AMF_TYPE_UNSUPPORTED = 13,
+ GST_AMF_TYPE_RECORDSET = 14,
+ GST_AMF_TYPE_XML_DOCUMENT = 15,
+ GST_AMF_TYPE_TYPED_OBJECT = 16,
+ GST_AMF_TYPE_AVMPLUS_OBJECT = 17
+} GstAmfType;
+
+
+struct _GstAmfNode {
+ GstAmfType type;
+ int int_val;
+ double double_val;
+ char *string_val;
+ GPtrArray *array_val;
+};
+typedef struct _GstAmfNode GstAmfNode;
+
+GstAmfNode * gst_amf_node_new (GstAmfType type);
+void gst_amf_node_free (GstAmfNode *node);
+
+GstAmfNode * gst_amf_node_new_parse (const char *data, int size, int *n_bytes);
+
+void gst_amf_node_set_boolean (GstAmfNode *node, gboolean val);
+void gst_amf_node_set_double (GstAmfNode *node, double val);
+void gst_amf_node_set_string (GstAmfNode *node, const char *s);
+void gst_amf_node_set_string_take (GstAmfNode *node, char *s);
+void gst_amf_object_append_take (GstAmfNode *node, char *s, GstAmfNode *child_node);
+
+
+G_END_DECLS
+
+#endif
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
new file mode 100644
index 0000000..de80211
--- /dev/null
+++ b/rtmp/rtmpchunk.c
@@ -0,0 +1,121 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include "rtmpchunk.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_chunk_debug_category
+
+/* prototypes */
+
+
+static void gst_rtmp_chunk_set_property (GObject * object,
+ guint property_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtmp_chunk_get_property (GObject * object,
+ guint property_id, GValue * value, GParamSpec * pspec);
+static void gst_rtmp_chunk_dispose (GObject * object);
+static void gst_rtmp_chunk_finalize (GObject * object);
+
+
+enum
+{
+ PROP_0
+};
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmpChunk, gst_rtmp_chunk, G_TYPE_OBJECT,
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_debug_category, "rtmpchunk", 0,
+ "debug category for rtmpchunk element"));
+
+static void
+gst_rtmp_chunk_class_init (GstRtmpChunkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->set_property = gst_rtmp_chunk_set_property;
+ gobject_class->get_property = gst_rtmp_chunk_get_property;
+ gobject_class->dispose = gst_rtmp_chunk_dispose;
+ gobject_class->finalize = gst_rtmp_chunk_finalize;
+
+}
+
+static void
+gst_rtmp_chunk_init (GstRtmpChunk * rtmpchunk)
+{
+}
+
+void
+gst_rtmp_chunk_set_property (GObject * object, guint property_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object);
+
+ GST_DEBUG_OBJECT (rtmpchunk, "set_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_chunk_get_property (GObject * object, guint property_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object);
+
+ GST_DEBUG_OBJECT (rtmpchunk, "get_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_chunk_dispose (GObject * object)
+{
+ GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object);
+
+ GST_DEBUG_OBJECT (rtmpchunk, "dispose");
+
+ /* clean up as possible. may be called multiple times */
+
+ G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->dispose (object);
+}
+
+void
+gst_rtmp_chunk_finalize (GObject * object)
+{
+ GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object);
+
+ GST_DEBUG_OBJECT (rtmpchunk, "finalize");
+
+ /* clean up object here */
+
+ G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->finalize (object);
+}
diff --git a/rtmp/rtmppacket.h b/rtmp/rtmpchunk.h
index 2113dc6..5e799ee 100644
--- a/rtmp/rtmppacket.h
+++ b/rtmp/rtmpchunk.h
@@ -17,39 +17,40 @@
* Boston, MA 02110-1301, USA.
*/
-#ifndef _GST_RTMP_PACKET_H_
-#define _GST_RTMP_PACKET_H_
+#ifndef _GST_RTMP_CHUNK_H_
+#define _GST_RTMP_CHUNK_H_
G_BEGIN_DECLS
-#define GST_TYPE_RTMP_PACKET (gst_rtmp_packet_get_type())
-#define GST_RTMP_PACKET(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_PACKET,GstRtmpPacket))
-#define GST_RTMP_PACKET_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_PACKET,GstRtmpPacketClass))
-#define GST_IS_RTMP_PACKET(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_PACKET))
-#define GST_IS_RTMP_PACKET_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_PACKET))
+#define GST_TYPE_RTMP_CHUNK (gst_rtmp_chunk_get_type())
+#define GST_RTMP_CHUNK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_CHUNK,GstRtmpChunk))
+#define GST_RTMP_CHUNK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_CHUNK,GstRtmpChunkClass))
+#define GST_IS_RTMP_CHUNK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_CHUNK))
+#define GST_IS_RTMP_CHUNK_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_CHUNK))
-typedef struct _GstRtmpPacket GstRtmpPacket;
-typedef struct _GstRtmpPacketClass GstRtmpPacketClass;
+typedef struct _GstRtmpChunk GstRtmpChunk;
+typedef struct _GstRtmpChunkClass GstRtmpChunkClass;
-struct _GstRtmpPacket
+struct _GstRtmpChunk
{
GObject object;
- char *request_data;
- gsize request_length;
-
- char *response_data;
- gsize response_length;
+ guint32 timestamp;
+ int type_id;
+ guint32 chunk_stream_id;
+ guint32 stream_id;
+ int length;
+ guint8 *data;
};
-struct _GstRtmpPacketClass
+struct _GstRtmpChunkClass
{
GObjectClass object_class;
};
-GType gst_rtmp_packet_get_type (void);
+GType gst_rtmp_chunk_get_type (void);
G_END_DECLS
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 2fde5fe..9db1519 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -22,6 +22,8 @@
#endif
#include <gst/gst.h>
+#include <gio/gio.h>
+#include <string.h>
#include "rtmpclient.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
@@ -36,6 +38,21 @@ static void gst_rtmp_client_get_property (GObject * object,
static void gst_rtmp_client_dispose (GObject * object);
static void gst_rtmp_client_finalize (GObject * object);
+static void
+gst_rtmp_client_connect_start (GstRtmpClient * client,
+ GCancellable * cancellable, GSimpleAsyncResult * async);
+static void
+gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void
+gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void
+gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void
+gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result,
+ gpointer user_data);
enum
{
@@ -120,3 +137,265 @@ gst_rtmp_client_finalize (GObject * object)
G_OBJECT_CLASS (gst_rtmp_client_parent_class)->finalize (object);
}
+
+/* API */
+
+GstRtmpClient *
+gst_rtmp_client_new (void)
+{
+
+ return g_object_new (GST_TYPE_RTMP_CLIENT, NULL);
+
+}
+
+void
+gst_rtmp_client_set_host (GstRtmpClient * client, const char *host)
+{
+ g_free (client->host);
+ client->host = g_strdup (host);
+}
+
+void
+gst_rtmp_client_set_stream (GstRtmpClient * client, const char *stream)
+{
+ g_free (client->stream);
+ client->stream = g_strdup (stream);
+}
+
+void
+gst_rtmp_client_connect_async (GstRtmpClient * client,
+ GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *simple;
+
+ if (client->state != GST_RTMP_CLIENT_STATE_NEW) {
+ g_simple_async_report_error_in_idle (G_OBJECT (client),
+ callback, user_data, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "already connected");
+ return;
+ }
+
+ simple = g_simple_async_result_new (G_OBJECT (client),
+ callback, user_data, gst_rtmp_client_connect_async);
+
+ gst_rtmp_client_connect_start (client, cancellable, simple);
+}
+
+gboolean
+gst_rtmp_client_connect_finish (GstRtmpClient * client,
+ GAsyncResult * result, GError ** error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (g_simple_async_result_is_valid (result,
+ G_OBJECT (client), gst_rtmp_client_connect_async), FALSE);
+
+ simple = (GSimpleAsyncResult *) result;
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+
+ return TRUE;
+}
+
+
+/* async implementation */
+
+/* connect */
+
+typedef struct _ConnectContext ConnectContext;
+struct _ConnectContext
+{
+ GstRtmpClient *client;
+ GSimpleAsyncResult *async;
+ GCancellable *cancellable;
+
+ GSocketClient *socket_client;
+ GSocketConnection *connection;
+ guint8 *handshake_send;
+ guint8 *handshake_recv;
+};
+
+static void
+gst_rtmp_client_connect_complete (ConnectContext * context)
+{
+ GSimpleAsyncResult *async = context->async;
+
+ if (context->socket_client)
+ g_object_unref (context->socket_client);
+ if (context->connection)
+ g_object_unref (context->connection);
+ g_free (context->handshake_send);
+ g_free (context->handshake_recv);
+ g_free (context);
+ g_simple_async_result_complete (async);
+}
+
+static void
+gst_rtmp_client_connect_start (GstRtmpClient * client,
+ GCancellable * cancellable, GSimpleAsyncResult * async)
+{
+ ConnectContext *context;
+ GSocketConnectable *addr;
+
+ context = g_malloc0 (sizeof (ConnectContext));
+ context->cancellable = cancellable;
+ context->client = client;
+ context->async = async;
+
+ addr = g_network_address_new ("localhost", 1935);
+ context->socket_client = g_socket_client_new ();
+
+ GST_ERROR ("g_socket_client_connect_async");
+ g_socket_client_connect_async (context->socket_client, addr,
+ context->cancellable, gst_rtmp_client_connect_1, context);
+}
+
+
+static void
+gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ ConnectContext *context = user_data;
+ GError *error = NULL;
+ GOutputStream *os;
+
+ GST_ERROR ("g_socket_client_connect_finish");
+ context->connection = g_socket_client_connect_finish (context->socket_client,
+ result, &error);
+ if (context->connection == NULL) {
+ GST_ERROR ("error");
+ g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
+ g_error_free (error);
+ gst_rtmp_client_connect_complete (context);
+ return;
+ }
+
+ context->handshake_send = g_malloc (1537);
+ context->handshake_send[0] = 3;
+ memset (context->handshake_send + 1, 0, 8);
+ memset (context->handshake_send + 9, 0xa5, 1528);
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection));
+
+ GST_ERROR ("g_output_stream_write_async");
+ g_output_stream_write_async (os, context->handshake_send, 1537,
+ G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_2,
+ context);
+}
+
+static void
+gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ ConnectContext *context = user_data;
+ GOutputStream *os;
+ GInputStream *is;
+ GError *error = NULL;
+ gssize n;
+
+ GST_ERROR ("g_output_stream_write_finish");
+ os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection));
+ n = g_output_stream_write_finish (os, result, &error);
+ GST_ERROR ("wrote %" G_GSIZE_FORMAT " bytes", n);
+ if (error) {
+ GST_ERROR ("error");
+ g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
+ g_error_free (error);
+ gst_rtmp_client_connect_complete (context);
+ return;
+ }
+
+ context->handshake_recv = g_malloc (1537);
+
+ GST_ERROR ("g_input_stream_read_async");
+ is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
+ g_input_stream_read_async (is, context->handshake_recv, 1537,
+ G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_3,
+ context);
+}
+
+static void
+gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ ConnectContext *context = user_data;
+ GInputStream *is;
+ GError *error = NULL;
+ gssize n;
+
+ GST_ERROR ("g_input_stream_read_finish");
+ is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
+ n = g_input_stream_read_finish (is, result, &error);
+ GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n);
+ if (error) {
+ GST_ERROR ("error");
+ g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
+ g_error_free (error);
+ gst_rtmp_client_connect_complete (context);
+ return;
+ }
+#if 0
+ GST_ERROR ("recv: %02x %02x %02x %02x %02x %02x %02x %02x %02x",
+ context->handshake_recv[0], context->handshake_recv[1],
+ context->handshake_recv[2], context->handshake_recv[3],
+ context->handshake_recv[4], context->handshake_recv[5],
+ context->handshake_recv[6], context->handshake_recv[7],
+ context->handshake_recv[8]);
+#endif
+
+ n = g_output_stream_write (g_io_stream_get_output_stream (G_IO_STREAM
+ (context->connection)), context->handshake_recv + 1, 1536,
+ context->cancellable, &error);
+ if (n < 1536) {
+ GST_ERROR ("error");
+ g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
+ g_error_free (error);
+ gst_rtmp_client_connect_complete (context);
+ return;
+ }
+
+ GST_ERROR ("g_input_stream_read_async");
+ is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
+ g_input_stream_read_async (is, context->handshake_recv + 1, 1536,
+ G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_4,
+ context);
+
+}
+
+static void
+gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ ConnectContext *context = user_data;
+ GInputStream *is;
+ GError *error = NULL;
+ gssize n;
+
+ GST_ERROR ("g_input_stream_read_finish");
+ is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection));
+ n = g_input_stream_read_finish (is, result, &error);
+ GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n);
+ if (error) {
+ GST_ERROR ("error");
+ g_simple_async_result_set_error (context->async, GST_RTMP_ERROR,
+ GST_RTMP_ERROR_TOO_LAZY, "%s", error->message);
+ g_error_free (error);
+ gst_rtmp_client_connect_complete (context);
+ return;
+ }
+
+ context->client->socket_client = context->socket_client;
+ context->socket_client = NULL;
+ context->client->connection = context->connection;
+ context->connection = NULL;
+ context->client->state = GST_RTMP_CLIENT_STATE_CONNECTED;
+
+ GST_ERROR ("got here");
+ gst_rtmp_client_connect_complete (context);
+}
diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h
index d9f078f..8a1b8b1 100644
--- a/rtmp/rtmpclient.h
+++ b/rtmp/rtmpclient.h
@@ -20,7 +20,8 @@
#ifndef _GST_RTMP_CLIENT_H_
#define _GST_RTMP_CLIENT_H_
-#include <rtmp/rtmppacket.h>
+#include <rtmp/rtmpmessage.h>
+#include <rtmp/rtmpchunk.h>
G_BEGIN_DECLS
@@ -33,22 +34,42 @@ G_BEGIN_DECLS
typedef struct _GstRtmpClient GstRtmpClient;
typedef struct _GstRtmpClientClass GstRtmpClientClass;
-typedef void (*GstRtmpClientCallback) (GstRtmpClient *client,
- GstRtmpPacket *packet, gpointer user_data);
+typedef void (*GstRtmpClientMessageCallback) (GstRtmpClient *client,
+ GstRtmpMessage *message, gpointer user_data);
+typedef void (*GstRtmpClientChunkCallback) (GstRtmpClient *client,
+ GstRtmpChunk *chunk, gpointer user_data);
+
+#define GST_RTMP_ERROR g_quark_from_static_string ("GstRtmpError")
+
+enum {
+ GST_RTMP_ERROR_TOO_LAZY = 0
+};
+
+typedef enum {
+ GST_RTMP_CLIENT_STATE_NEW,
+ GST_RTMP_CLIENT_STATE_CONNECTING,
+ GST_RTMP_CLIENT_STATE_CONNECTED,
+} GstRtmpClientState;
+
struct _GstRtmpClient
{
GObject object;
/* properties */
- char *server_host;
-
+ char *host;
+ int port;
+ char *stream;
/* private */
+ GstRtmpClientState state;
GMutex lock;
GCond cond;
GMainContext *context;
+ GSocketClient *socket_client;
+ GSocketConnection *connection;
+
};
struct _GstRtmpClientClass
@@ -56,7 +77,8 @@ struct _GstRtmpClientClass
GObjectClass object_class;
/* signals */
- void (*got_packet) (GstRtmpClient *client, GstRtmpPacket *packet);
+ void (*got_chunk) (GstRtmpClient *client, GstRtmpChunk *chunk);
+ void (*got_message) (GstRtmpClient *client, GstRtmpMessage *message);
};
@@ -65,8 +87,17 @@ GType gst_rtmp_client_get_type (void);
GstRtmpClient *gst_rtmp_client_new (void);
void gst_rtmp_client_set_url (GstRtmpClient *client, const char *url);
-void gst_rtmp_client_queue_packet (GstRtmpClient *client,
- GstRtmpPacket *packet, GstRtmpClientCallback callback,
+void gst_rtmp_client_connect_async (GstRtmpClient *client,
+ GCancellable *cancellable, GAsyncReadyCallback callback,
+ gpointer user_data);
+gboolean gst_rtmp_client_connect_finish (GstRtmpClient *client,
+ GAsyncResult *result, GError **error);
+
+void gst_rtmp_client_queue_message (GstRtmpClient *client,
+ GstRtmpMessage *message, GstRtmpClientMessageCallback callback,
+ gpointer user_data);
+void gst_rtmp_client_queue_chunk (GstRtmpClient *client,
+ GstRtmpChunk *Chunk, GstRtmpClientChunkCallback callback,
gpointer user_data);
diff --git a/rtmp/rtmpmessage.c b/rtmp/rtmpmessage.c
new file mode 100644
index 0000000..0ba1255
--- /dev/null
+++ b/rtmp/rtmpmessage.c
@@ -0,0 +1,121 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include "rtmpmessage.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_message_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_message_debug_category
+
+/* prototypes */
+
+
+static void gst_rtmp_message_set_property (GObject * object,
+ guint property_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtmp_message_get_property (GObject * object,
+ guint property_id, GValue * value, GParamSpec * pspec);
+static void gst_rtmp_message_dispose (GObject * object);
+static void gst_rtmp_message_finalize (GObject * object);
+
+
+enum
+{
+ PROP_0
+};
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmpMessage, gst_rtmp_message, G_TYPE_OBJECT,
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_message_debug_category, "rtmpmessage", 0,
+ "debug category for rtmpmessage element"));
+
+static void
+gst_rtmp_message_class_init (GstRtmpMessageClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->set_property = gst_rtmp_message_set_property;
+ gobject_class->get_property = gst_rtmp_message_get_property;
+ gobject_class->dispose = gst_rtmp_message_dispose;
+ gobject_class->finalize = gst_rtmp_message_finalize;
+
+}
+
+static void
+gst_rtmp_message_init (GstRtmpMessage * rtmpmessage)
+{
+}
+
+void
+gst_rtmp_message_set_property (GObject * object, guint property_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object);
+
+ GST_DEBUG_OBJECT (rtmpmessage, "set_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_message_get_property (GObject * object, guint property_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object);
+
+ GST_DEBUG_OBJECT (rtmpmessage, "get_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_message_dispose (GObject * object)
+{
+ GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object);
+
+ GST_DEBUG_OBJECT (rtmpmessage, "dispose");
+
+ /* clean up as possible. may be called multiple times */
+
+ G_OBJECT_CLASS (gst_rtmp_message_parent_class)->dispose (object);
+}
+
+void
+gst_rtmp_message_finalize (GObject * object)
+{
+ GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object);
+
+ GST_DEBUG_OBJECT (rtmpmessage, "finalize");
+
+ /* clean up object here */
+
+ G_OBJECT_CLASS (gst_rtmp_message_parent_class)->finalize (object);
+}
diff --git a/rtmp/rtmpmessage.h b/rtmp/rtmpmessage.h
new file mode 100644
index 0000000..053d1db
--- /dev/null
+++ b/rtmp/rtmpmessage.h
@@ -0,0 +1,55 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_MESSAGE_H_
+#define _GST_RTMP_MESSAGE_H_
+
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP_MESSAGE (gst_rtmp_message_get_type())
+#define GST_RTMP_MESSAGE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_MESSAGE,GstRtmpMessage))
+#define GST_RTMP_MESSAGE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_MESSAGE,GstRtmpMessageClass))
+#define GST_IS_RTMP_MESSAGE(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_MESSAGE))
+#define GST_IS_RTMP_MESSAGE_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_MESSAGE))
+
+typedef struct _GstRtmpMessage GstRtmpMessage;
+typedef struct _GstRtmpMessageClass GstRtmpMessageClass;
+
+struct _GstRtmpMessage
+{
+ GObject object;
+
+ int message_type;
+ guint32 payload_length;
+ guint32 timestamp;
+ guint32 stream_id;
+
+};
+
+struct _GstRtmpMessageClass
+{
+ GObjectClass object_class;
+};
+
+GType gst_rtmp_message_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index 3c6bc88..0b089b4 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -23,6 +23,7 @@
#include <gst/gst.h>
#include "rtmpserver.h"
+#include "rtmpserverconnection.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_server_debug_category
@@ -35,6 +36,9 @@ static void gst_rtmp_server_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp_server_dispose (GObject * object);
static void gst_rtmp_server_finalize (GObject * object);
+static gboolean gst_rtmp_server_incoming (GSocketService * service,
+ GSocketConnection * connection, GObject * source_object,
+ gpointer user_data);
enum
@@ -63,6 +67,7 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass)
static void
gst_rtmp_server_init (GstRtmpServer * rtmpserver)
{
+ rtmpserver->port = 11935;
}
void
@@ -118,3 +123,53 @@ gst_rtmp_server_finalize (GObject * object)
G_OBJECT_CLASS (gst_rtmp_server_parent_class)->finalize (object);
}
+
+GstRtmpServer *
+gst_rtmp_server_new (void)
+{
+ return g_object_new (GST_TYPE_RTMP_SERVER, NULL);
+}
+
+void
+gst_rtmp_server_start (GstRtmpServer * rtmpserver)
+{
+ gboolean ret;
+ GError *error = NULL;
+
+ if (rtmpserver->socket_service) {
+ GST_ERROR ("rtmp server already started");
+ return;
+ }
+
+ rtmpserver->socket_service = g_socket_service_new ();
+
+ ret =
+ g_socket_listener_add_inet_port (G_SOCKET_LISTENER (rtmpserver->
+ socket_service), rtmpserver->port, NULL, &error);
+ if (!ret) {
+ GST_ERROR ("failed to add address: %s", error->message);
+ g_object_unref (rtmpserver->socket_service);
+ rtmpserver->socket_service = NULL;
+ return;
+ }
+
+ g_signal_connect (rtmpserver->socket_service, "incoming",
+ G_CALLBACK (gst_rtmp_server_incoming), rtmpserver);
+}
+
+static gboolean
+gst_rtmp_server_incoming (GSocketService * service,
+ GSocketConnection * connection, GObject * source_object, gpointer user_data)
+{
+ GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data);
+ GstRtmpServerConnection *server_connection;
+
+ GST_ERROR ("client connected");
+
+ g_object_ref (connection);
+ server_connection = gst_rtmp_server_connection_new (connection);
+ (void) server_connection;
+ (void) rtmpserver;
+
+ return TRUE;
+}
diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h
index 0437dca..e1e0466 100644
--- a/rtmp/rtmpserver.h
+++ b/rtmp/rtmpserver.h
@@ -20,6 +20,7 @@
#ifndef _GST_RTMP_SERVER_H_
#define _GST_RTMP_SERVER_H_
+#include <gio/gio.h>
G_BEGIN_DECLS
@@ -36,6 +37,12 @@ struct _GstRtmpServer
{
GObject object;
+ /* properties */
+ int port;
+
+ /* private */
+ GSocketService *socket_service;
+
};
struct _GstRtmpServerClass
@@ -45,6 +52,9 @@ struct _GstRtmpServerClass
GType gst_rtmp_server_get_type (void);
+GstRtmpServer *gst_rtmp_server_new (void);
+void gst_rtmp_server_start (GstRtmpServer * rtmpserver);
+
G_END_DECLS
#endif
diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c
new file mode 100644
index 0000000..b3b08da
--- /dev/null
+++ b/rtmp/rtmpserverconnection.c
@@ -0,0 +1,593 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include "rtmpserverconnection.h"
+#include "amf.h"
+
+#include <string.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_connection_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_server_connection_debug_category
+
+/* prototypes */
+
+static void gst_rtmp_server_connection_set_property (GObject * object,
+ guint property_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtmp_server_connection_get_property (GObject * object,
+ guint property_id, GValue * value, GParamSpec * pspec);
+static void gst_rtmp_server_connection_dispose (GObject * object);
+static void gst_rtmp_server_connection_finalize (GObject * object);
+
+static void proxy_connect (GstRtmpServerConnection * sc);
+static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection *
+ sc);
+
+enum
+{
+ PROP_0
+};
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmpServerConnection, gst_rtmp_server_connection,
+ G_TYPE_OBJECT,
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_server_connection_debug_category,
+ "rtmpserverconnection", 0,
+ "debug category for GstRtmpServerConnection class"));
+
+static void
+gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->set_property = gst_rtmp_server_connection_set_property;
+ gobject_class->get_property = gst_rtmp_server_connection_get_property;
+ gobject_class->dispose = gst_rtmp_server_connection_dispose;
+ gobject_class->finalize = gst_rtmp_server_connection_finalize;
+
+}
+
+static void
+gst_rtmp_server_connection_init (GstRtmpServerConnection * rtmpserverconnection)
+{
+ rtmpserverconnection->cancellable = g_cancellable_new ();
+}
+
+void
+gst_rtmp_server_connection_set_property (GObject * object, guint property_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtmpServerConnection *rtmpserverconnection =
+ GST_RTMP_SERVER_CONNECTION (object);
+
+ GST_DEBUG_OBJECT (rtmpserverconnection, "set_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_server_connection_get_property (GObject * object, guint property_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtmpServerConnection *rtmpserverconnection =
+ GST_RTMP_SERVER_CONNECTION (object);
+
+ GST_DEBUG_OBJECT (rtmpserverconnection, "get_property");
+
+ switch (property_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+void
+gst_rtmp_server_connection_dispose (GObject * object)
+{
+ GstRtmpServerConnection *rtmpserverconnection =
+ GST_RTMP_SERVER_CONNECTION (object);
+
+ GST_DEBUG_OBJECT (rtmpserverconnection, "dispose");
+
+ /* clean up as possible. may be called multiple times */
+
+ G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->dispose (object);
+}
+
+void
+gst_rtmp_server_connection_finalize (GObject * object)
+{
+ GstRtmpServerConnection *rtmpserverconnection =
+ GST_RTMP_SERVER_CONNECTION (object);
+
+ GST_DEBUG_OBJECT (rtmpserverconnection, "finalize");
+
+ /* clean up object here */
+
+ G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->finalize (object);
+}
+
+GstRtmpServerConnection *
+gst_rtmp_server_connection_new (GSocketConnection * connection)
+{
+ GstRtmpServerConnection *sc;
+
+ sc = g_object_new (GST_TYPE_RTMP_SERVER_CONNECTION, NULL);
+ sc->connection = connection;
+
+ //proxy_connect (sc);
+ //gst_rtmp_server_connection_read_chunk (sc);
+ gst_rtmp_server_connection_handshake1 (sc);
+
+ return sc;
+}
+
+static void
+gst_rtmp_server_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+
+typedef struct _ChunkRead ChunkRead;
+struct _ChunkRead
+{
+ gpointer data;
+ gsize alloc_size;
+ gsize size;
+ GstRtmpServerConnection *server_connection;
+};
+
+
+static void proxy_connect (GstRtmpServerConnection * sc);
+static void proxy_connect_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection *
+ sc);
+static void gst_rtmp_server_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk);
+static void proxy_write_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void proxy_read_chunk (GstRtmpServerConnection * sc);
+static void proxy_read_done (GObject * obj, GAsyncResult * res,
+ gpointer user_data);
+static void gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection *
+ sc, ChunkRead * chunk);
+static void gst_rtmp_server_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection *
+ sc);
+static void gst_rtmp_server_connection_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc,
+ GBytes * bytes);
+static void gst_rtmp_server_connection_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+static void gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection *
+ sc);
+static void gst_rtmp_server_connection_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data);
+
+
+static void
+proxy_connect (GstRtmpServerConnection * sc)
+{
+ GSocketConnectable *addr;
+
+ GST_ERROR ("proxy_connect");
+
+ addr = g_network_address_new ("localhost", 1935);
+
+ sc->socket_client = g_socket_client_new ();
+ g_socket_client_connect_async (sc->socket_client, addr,
+ sc->cancellable, proxy_connect_done, sc);
+}
+
+static void
+proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
+ GError *error = NULL;
+
+ GST_ERROR ("proxy_connect_done");
+
+ sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client,
+ res, &error);
+ if (sc->proxy_connection == NULL) {
+ GST_ERROR ("connection error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+
+ gst_rtmp_server_connection_read_chunk (sc);
+}
+
+static void
+gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc)
+{
+ GInputStream *is;
+ ChunkRead *chunk;
+
+ chunk = g_malloc0 (sizeof (ChunkRead));
+ chunk->alloc_size = 4096;
+ chunk->data = g_malloc (chunk->alloc_size);
+ chunk->server_connection = sc;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_server_connection_read_chunk_done, chunk);
+}
+
+static void
+parse_message (guint8 * data, int size)
+{
+ int offset;
+ int bytes_read;
+ GstAmfNode *node;
+
+ offset = 4;
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+ node = gst_amf_node_new_parse ((const char *) (data + offset),
+ size - offset, &bytes_read);
+ offset += bytes_read;
+ g_print ("bytes_read: %d\n", bytes_read);
+ if (node)
+ gst_amf_node_free (node);
+
+}
+
+typedef struct _Chunk Chunk;
+struct _Chunk
+{
+ int stream_id;
+ int header_fmt;
+ guint32 timestamp;
+ int message_length;
+ int message_type_id;
+ guint32 message_stream_id;
+};
+
+static void
+parse_chunk (guint8 * data, int size)
+{
+ Chunk _chunk, *chunk = &_chunk;
+ int offset;
+
+ chunk->header_fmt = data[0] >> 6;
+ chunk->stream_id = data[0] & 0x3f;
+ offset = 1;
+ if (chunk->stream_id == 0) {
+ chunk->stream_id = 64 + data[1];
+ offset = 2;
+ } else if (chunk->stream_id == 1) {
+ chunk->stream_id = 64 + data[1] + (data[2] << 8);
+ offset = 3;
+ }
+ chunk->timestamp =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ chunk->message_length =
+ (data[offset + 3] << 16) | (data[offset + 4] << 8) | data[offset + 5];
+ chunk->message_type_id = data[offset + 6];
+ offset += 7;
+
+ g_print ("header_fmt: %d\n", chunk->header_fmt);
+ g_print ("stream_id: %d\n", chunk->stream_id);
+ g_print ("timestamp: %d\n", chunk->timestamp);
+ g_print ("message_length: %d\n", chunk->message_length);
+ g_print ("message_type_id: %d\n", chunk->message_type_id);
+
+ parse_message (data + offset, size - offset);
+}
+
+static void
+dump_data (guint8 * data, int size)
+{
+ int i, j;
+ for (i = 0; i < size; i += 16) {
+ g_print ("%04x: ", i);
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%02x ", data[i + j]);
+ } else {
+ g_print (" ");
+ }
+ }
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.');
+ }
+ }
+ g_print ("\n");
+ }
+}
+
+static void
+gst_rtmp_server_connection_read_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("gst_rtmp_server_connection_read_chunk_done");
+
+ ret = g_input_stream_read_finish (is, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", ret);
+ chunk->size = ret;
+
+ parse_chunk (chunk->data, chunk->size);
+ dump_data (chunk->data, chunk->size);
+
+ if (0)
+ proxy_write_chunk (chunk->server_connection, chunk);
+}
+
+static void
+proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk)
+{
+ GOutputStream *os;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection));
+
+ g_output_stream_write_async (os, chunk->data, chunk->size,
+ G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk);
+}
+
+static void
+proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("proxy_write_done");
+
+ ret = g_output_stream_write_finish (os, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret);
+
+ proxy_read_chunk (chunk->server_connection);
+
+ g_free (chunk->data);
+ g_free (chunk);
+}
+
+static void
+proxy_read_chunk (GstRtmpServerConnection * sc)
+{
+ GInputStream *is;
+ ChunkRead *chunk;
+
+ chunk = g_malloc0 (sizeof (ChunkRead));
+ chunk->alloc_size = 4096;
+ chunk->data = g_malloc (chunk->alloc_size);
+ chunk->server_connection = sc;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection));
+
+ g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
+ G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk);
+}
+
+static void
+proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("proxy_read_done");
+
+ ret = g_input_stream_read_finish (is, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret);
+ chunk->size = ret;
+
+ gst_rtmp_server_connection_write_chunk (chunk->server_connection, chunk);
+}
+
+static void
+gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * sc,
+ ChunkRead * chunk)
+{
+ GOutputStream *os;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ g_output_stream_write_async (os, chunk->data, chunk->size,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_server_connection_write_chunk_done, chunk);
+}
+
+static void
+gst_rtmp_server_connection_write_chunk_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ ChunkRead *chunk = (ChunkRead *) user_data;
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("gst_rtmp_server_connection_write_chunk_done");
+
+ ret = g_output_stream_write_finish (os, res, &error);
+ if (ret < 0) {
+ GST_ERROR ("write error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
+
+ gst_rtmp_server_connection_read_chunk (chunk->server_connection);
+
+ g_free (chunk->data);
+ g_free (chunk);
+}
+
+static void
+gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * sc)
+{
+ GInputStream *is;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_bytes_async (is, 4096,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_server_connection_handshake1_done, sc);
+}
+
+static void
+gst_rtmp_server_connection_handshake1_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
+ GError *error = NULL;
+ GBytes *bytes;
+
+ GST_ERROR ("gst_rtmp_server_connection_handshake1_done");
+
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_server_connection_handshake2 (sc, bytes);
+}
+
+static void
+gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc,
+ GBytes * bytes)
+{
+ GOutputStream *os;
+ guint8 *data;
+ GBytes *out_bytes;
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
+
+ data = g_malloc (1 + 1536 + 1536);
+ memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536);
+ memset (data + 1537, 0, 8);
+ memset (data + 1537 + 8, 0xef, 1528);
+
+ out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536);
+
+ g_output_stream_write_bytes_async (os, out_bytes,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_server_connection_handshake2_done, sc);
+}
+
+static void
+gst_rtmp_server_connection_handshake2_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
+ GError *error = NULL;
+ gssize ret;
+
+ GST_ERROR ("gst_rtmp_server_connection_handshake2_done");
+
+ ret = g_output_stream_write_bytes_finish (os, res, &error);
+ if (ret < 1 + 1536 + 1536) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret);
+
+ gst_rtmp_server_connection_handshake3 (sc);
+}
+
+static void
+gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * sc)
+{
+ GInputStream *is;
+
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+
+ g_input_stream_read_bytes_async (is, 1536,
+ G_PRIORITY_DEFAULT, sc->cancellable,
+ gst_rtmp_server_connection_handshake3_done, sc);
+}
+
+static void
+gst_rtmp_server_connection_handshake3_done (GObject * obj,
+ GAsyncResult * res, gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (obj);
+ GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data);
+ GError *error = NULL;
+ GBytes *bytes;
+
+ GST_ERROR ("gst_rtmp_server_connection_handshake3_done");
+
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
+ GST_ERROR ("read error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ gst_rtmp_server_connection_read_chunk (sc);
+ (void) &proxy_connect;
+}
diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h
new file mode 100644
index 0000000..ab1f53c
--- /dev/null
+++ b/rtmp/rtmpserverconnection.h
@@ -0,0 +1,60 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_SERVER_CONNECTION_H_
+#define _GST_RTMP_SERVER_CONNECTION_H_
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP_SERVER_CONNECTION (gst_rtmp_server_connection_get_type())
+#define GST_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnection))
+#define GST_RTMP_SERVER_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnectionClass))
+#define GST_IS_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_SERVER_CONNECTION))
+#define GST_IS_RTMP_SERVER_CONNECTION_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_SERVER_CONNECTION))
+
+typedef struct _GstRtmpServerConnection GstRtmpServerConnection;
+typedef struct _GstRtmpServerConnectionClass GstRtmpServerConnectionClass;
+
+struct _GstRtmpServerConnection
+{
+ GObject object;
+
+ /* private */
+ GSocketConnection *connection;
+ GSocketConnection *proxy_connection;
+ GCancellable *cancellable;
+ int state;
+ GSocketClient *socket_client;
+};
+
+struct _GstRtmpServerConnectionClass
+{
+ GObjectClass object_class;
+};
+
+GType gst_rtmp_server_connection_get_type (void);
+
+GstRtmpServerConnection *gst_rtmp_server_connection_new (
+ GSocketConnection *connection);
+
+G_END_DECLS
+
+#endif
diff --git a/rtmp/rtmppacket.c b/rtmp/rtmpstream.c
index de342c5..961e09c 100644
--- a/rtmp/rtmppacket.c
+++ b/rtmp/rtmpstream.c
@@ -22,20 +22,20 @@
#endif
#include <gst/gst.h>
-#include "rtmppacket.h"
+#include "rtmpstream.h"
-GST_DEBUG_CATEGORY_STATIC (gst_rtmp_packet_debug_category);
-#define GST_CAT_DEFAULT gst_rtmp_packet_debug_category
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_stream_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_stream_debug_category
/* prototypes */
-static void gst_rtmp_packet_set_property (GObject * object,
+static void gst_rtmp_stream_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec);
-static void gst_rtmp_packet_get_property (GObject * object,
+static void gst_rtmp_stream_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
-static void gst_rtmp_packet_dispose (GObject * object);
-static void gst_rtmp_packet_finalize (GObject * object);
+static void gst_rtmp_stream_dispose (GObject * object);
+static void gst_rtmp_stream_finalize (GObject * object);
enum
@@ -45,34 +45,34 @@ enum
/* class initialization */
-G_DEFINE_TYPE_WITH_CODE (GstRtmpPacket, gst_rtmp_packet, G_TYPE_OBJECT,
- GST_DEBUG_CATEGORY_INIT (gst_rtmp_packet_debug_category, "rtmppacket", 0,
- "debug category for rtmppacket element"));
+G_DEFINE_TYPE_WITH_CODE (GstRtmpStream, gst_rtmp_stream, G_TYPE_OBJECT,
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_stream_debug_category, "rtmpstream", 0,
+ "debug category for rtmpstream element"));
static void
-gst_rtmp_packet_class_init (GstRtmpPacketClass * klass)
+gst_rtmp_stream_class_init (GstRtmpStreamClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- gobject_class->set_property = gst_rtmp_packet_set_property;
- gobject_class->get_property = gst_rtmp_packet_get_property;
- gobject_class->dispose = gst_rtmp_packet_dispose;
- gobject_class->finalize = gst_rtmp_packet_finalize;
+ gobject_class->set_property = gst_rtmp_stream_set_property;
+ gobject_class->get_property = gst_rtmp_stream_get_property;
+ gobject_class->dispose = gst_rtmp_stream_dispose;
+ gobject_class->finalize = gst_rtmp_stream_finalize;
}
static void
-gst_rtmp_packet_init (GstRtmpPacket * rtmppacket)
+gst_rtmp_stream_init (GstRtmpStream * rtmpstream)
{
}
void
-gst_rtmp_packet_set_property (GObject * object, guint property_id,
+gst_rtmp_stream_set_property (GObject * object, guint property_id,
const GValue * value, GParamSpec * pspec)
{
- GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object);
+ GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object);
- GST_DEBUG_OBJECT (rtmppacket, "set_property");
+ GST_DEBUG_OBJECT (rtmpstream, "set_property");
switch (property_id) {
default:
@@ -82,12 +82,12 @@ gst_rtmp_packet_set_property (GObject * object, guint property_id,
}
void
-gst_rtmp_packet_get_property (GObject * object, guint property_id,
+gst_rtmp_stream_get_property (GObject * object, guint property_id,
GValue * value, GParamSpec * pspec)
{
- GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object);
+ GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object);
- GST_DEBUG_OBJECT (rtmppacket, "get_property");
+ GST_DEBUG_OBJECT (rtmpstream, "get_property");
switch (property_id) {
default:
@@ -97,25 +97,25 @@ gst_rtmp_packet_get_property (GObject * object, guint property_id,
}
void
-gst_rtmp_packet_dispose (GObject * object)
+gst_rtmp_stream_dispose (GObject * object)
{
- GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object);
+ GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object);
- GST_DEBUG_OBJECT (rtmppacket, "dispose");
+ GST_DEBUG_OBJECT (rtmpstream, "dispose");
/* clean up as possible. may be called multiple times */
- G_OBJECT_CLASS (gst_rtmp_packet_parent_class)->dispose (object);
+ G_OBJECT_CLASS (gst_rtmp_stream_parent_class)->dispose (object);
}
void
-gst_rtmp_packet_finalize (GObject * object)
+gst_rtmp_stream_finalize (GObject * object)
{
- GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object);
+ GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object);
- GST_DEBUG_OBJECT (rtmppacket, "finalize");
+ GST_DEBUG_OBJECT (rtmpstream, "finalize");
/* clean up object here */
- G_OBJECT_CLASS (gst_rtmp_packet_parent_class)->finalize (object);
+ G_OBJECT_CLASS (gst_rtmp_stream_parent_class)->finalize (object);
}
diff --git a/rtmp/rtmpstream.h b/rtmp/rtmpstream.h
new file mode 100644
index 0000000..00dc71e
--- /dev/null
+++ b/rtmp/rtmpstream.h
@@ -0,0 +1,52 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_STREAM_H_
+#define _GST_RTMP_STREAM_H_
+
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP_STREAM (gst_rtmp_stream_get_type())
+#define GST_RTMP_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_STREAM,GstRtmpStream))
+#define GST_RTMP_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_STREAM,GstRtmpStreamClass))
+#define GST_IS_RTMP_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_STREAM))
+#define GST_IS_RTMP_STREAM_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_STREAM))
+
+typedef struct _GstRtmpStream GstRtmpStream;
+typedef struct _GstRtmpStreamClass GstRtmpStreamClass;
+
+struct _GstRtmpStream
+{
+ GObject object;
+
+ int stream_id;
+
+};
+
+struct _GstRtmpStreamClass
+{
+ GObjectClass object_class;
+};
+
+GType gst_rtmp_stream_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/tools/Makefile.am b/tools/Makefile.am
index e69de29..e81c2df 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -0,0 +1,12 @@
+
+
+noinst_PROGRAMS = client-test proxy-server
+
+client_test_SOURCES = client-test.c
+client_test_CFLAGS = $(GST_RTMP_CFLAGS) $(GST_CFLAGS) -I../rtmp
+client_test_LDADD = ../rtmp/libgstrtmp-1.0.la $(GST_LIBS)
+
+proxy_server_SOURCES = proxy-server.c
+proxy_server_CFLAGS = $(GST_RTMP_CFLAGS) $(GST_CFLAGS) -I../rtmp
+proxy_server_LDADD = ../rtmp/libgstrtmp-1.0.la $(GST_LIBS)
+
diff --git a/tools/client-test.c b/tools/client-test.c
new file mode 100644
index 0000000..8a0e9f4
--- /dev/null
+++ b/tools/client-test.c
@@ -0,0 +1,88 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gio/gio.h>
+#include <stdlib.h>
+#include "rtmpclient.h"
+
+#define GETTEXT_PACKAGE NULL
+
+gboolean verbose;
+
+static GOptionEntry entries[] = {
+ {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL},
+ {NULL}
+};
+
+static void
+connect_done (GObject *source, GAsyncResult *result, gpointer user_data);
+
+int
+main (int argc, char *argv[])
+{
+ GError *error = NULL;
+ GOptionContext *context;
+ GMainLoop *main_loop;
+ GstRtmpClient *client;
+ GCancellable *cancellable;
+
+ context = g_option_context_new ("- FIXME");
+ g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE);
+ g_option_context_add_group (context, gst_init_get_option_group ());
+ if (!g_option_context_parse (context, &argc, &argv, &error)) {
+ g_print ("option parsing failed: %s\n", error->message);
+ exit (1);
+ }
+ g_option_context_free (context);
+
+
+ client = gst_rtmp_client_new ();
+ cancellable = g_cancellable_new ();
+
+ main_loop = g_main_loop_new (NULL, TRUE);
+
+ gst_rtmp_client_connect_async (client, cancellable, connect_done,
+ client);
+
+ g_main_loop_run (main_loop);
+
+ exit (0);
+}
+
+static void
+connect_done (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ GstRtmpClient *client = user_data;
+ GError *error = NULL;
+ gboolean ret;
+
+ ret = gst_rtmp_client_connect_finish (client, result, &error);
+ if (!ret) {
+ GST_ERROR("error: %s", error->message);
+ g_error_free (error);
+ }
+
+ GST_ERROR("got here");
+}
+
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
new file mode 100644
index 0000000..3920d5f
--- /dev/null
+++ b/tools/proxy-server.c
@@ -0,0 +1,64 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gio/gio.h>
+#include <stdlib.h>
+#include "rtmpserver.h"
+
+#define GETTEXT_PACKAGE NULL
+
+gboolean verbose;
+
+static GOptionEntry entries[] = {
+ {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL},
+ {NULL}
+};
+
+
+int
+main (int argc, char *argv[])
+{
+ GError *error = NULL;
+ GOptionContext *context;
+ GMainLoop *main_loop;
+ GstRtmpServer *server;
+
+ context = g_option_context_new ("- FIXME");
+ g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE);
+ g_option_context_add_group (context, gst_init_get_option_group ());
+ if (!g_option_context_parse (context, &argc, &argv, &error)) {
+ g_print ("option parsing failed: %s\n", error->message);
+ exit (1);
+ }
+ g_option_context_free (context);
+
+ server = gst_rtmp_server_new ();
+ gst_rtmp_server_start (server);
+
+ main_loop = g_main_loop_new (NULL, TRUE);
+ g_main_loop_run (main_loop);
+
+ exit (0);
+}
+