diff options
author | David Schleef <ds@schleef.org> | 2014-08-23 11:21:25 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-23 11:21:25 -0700 |
commit | 45301abe260ed6c517cfc31ff0d6f23a0f1d36d6 (patch) | |
tree | fa859bbb20c3419a2c41a1e619857aeb9aa204de | |
parent | fbdab241c6628d3498d587abb9af2e897f4a6855 (diff) |
hacking
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | rtmp/Makefile.am | 17 | ||||
-rw-r--r-- | rtmp/amf.c | 267 | ||||
-rw-r--r-- | rtmp/amf.h | 72 | ||||
-rw-r--r-- | rtmp/rtmpchunk.c | 121 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h (renamed from rtmp/rtmppacket.h) | 35 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 279 | ||||
-rw-r--r-- | rtmp/rtmpclient.h | 47 | ||||
-rw-r--r-- | rtmp/rtmpmessage.c | 121 | ||||
-rw-r--r-- | rtmp/rtmpmessage.h | 55 | ||||
-rw-r--r-- | rtmp/rtmpserver.c | 55 | ||||
-rw-r--r-- | rtmp/rtmpserver.h | 10 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.c | 593 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.h | 60 | ||||
-rw-r--r-- | rtmp/rtmpstream.c (renamed from rtmp/rtmppacket.c) | 60 | ||||
-rw-r--r-- | rtmp/rtmpstream.h | 52 | ||||
-rw-r--r-- | tools/Makefile.am | 12 | ||||
-rw-r--r-- | tools/client-test.c | 88 | ||||
-rw-r--r-- | tools/proxy-server.c | 64 |
19 files changed, 1950 insertions, 60 deletions
diff --git a/configure.ac b/configure.ac index 7de7917..b09f07b 100644 --- a/configure.ac +++ b/configure.ac @@ -105,7 +105,7 @@ if test "$HAVE_GLIB" != yes ; then exit 1 fi -PKG_CHECK_MODULES(GST, gstreamer-$GST_API_VERSION gstreamer-base-$GST_API_VERSION gstreamer-video-$GST_API_VERSION gstreamer-audio-$GST_API_VERSION, +PKG_CHECK_MODULES(GST, gstreamer-$GST_API_VERSION gstreamer-base-$GST_API_VERSION gstreamer-video-$GST_API_VERSION gstreamer-audio-$GST_API_VERSION gio-2.0, HAVE_GST=yes, HAVE_GST=no) if test "$HAVE_GST" != yes ; then echo GStreamer and gst-plugns-base are needed to build diff --git a/rtmp/Makefile.am b/rtmp/Makefile.am index 43cd961..52c061b 100644 --- a/rtmp/Makefile.am +++ b/rtmp/Makefile.am @@ -4,6 +4,7 @@ lib_LTLIBRARIES = libgstrtmp-@GST_API_VERSION@.la gstrtmp_@GST_API_VERSION@_includedir = $(includedir)/gstreamer-@GST_API_VERSION@/rtmp libgstrtmp_@GST_API_VERSION@_la_CFLAGS = \ + $(GST_RTMP_CFLAGS) \ $(GST_CFLAGS) \ $(SOUP_CFLAGS) libgstrtmp_@GST_API_VERSION@_la_LIBADD = \ @@ -16,11 +17,19 @@ libgstrtmp_@GST_API_VERSION@_la_SOURCES = \ $(sources) sources = \ - rtmpserver.c \ - rtmpserver.h \ + amf.c \ + amf.h \ rtmpclient.c \ rtmpclient.h \ rtmpconnection.c \ rtmpconnection.h \ - rtmppacket.c \ - rtmppacket.h + rtmpmessage.c \ + rtmpmessage.h \ + rtmpchunk.c \ + rtmpchunk.h \ + rtmpserver.c \ + rtmpserver.h \ + rtmpserverconnection.c \ + rtmpserverconnection.h \ + rtmpstream.c \ + rtmpstream.h diff --git a/rtmp/amf.c b/rtmp/amf.c new file mode 100644 index 0000000..153f663 --- /dev/null +++ b/rtmp/amf.c @@ -0,0 +1,267 @@ +/* GStreamer RTMP Library + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> + +#include "amf.h" + +typedef struct _AmfObjectField AmfObjectField; +struct _AmfObjectField +{ + char *name; + GstAmfNode *value; +}; + +typedef struct _AmfParser AmfParser; +struct _AmfParser +{ + const char *data; + gsize size; + int offset; + gboolean error; +}; + +static char *_parse_utf8_string (AmfParser * parser); +static void _parse_object (AmfParser * parser, GstAmfNode * node); +static GstAmfNode *_parse_value (AmfParser * parser); +static void amf_object_field_free (AmfObjectField * field); + + +GstAmfNode * +gst_amf_node_new (GstAmfType type) +{ + GstAmfNode *node; + + node = g_malloc0 (sizeof (GstAmfNode)); + node->type = type; + if (node->type == GST_AMF_TYPE_OBJECT) { + node->array_val = g_ptr_array_new (); + } + + return node; +} + +void +gst_amf_node_free (GstAmfNode * node) +{ + if (node->type == GST_AMF_TYPE_STRING) { + g_free (node->string_val); + } else if (node->type == GST_AMF_TYPE_OBJECT) { + g_ptr_array_foreach (node->array_val, (GFunc) amf_object_field_free, NULL); + g_ptr_array_free (node->array_val, TRUE); + } + + g_free (node); +} + +static int +_parse_u8 (AmfParser * parser) +{ + int x; + x = parser->data[parser->offset]; + parser->offset++; + return x; +} + +static int +_parse_u16 (AmfParser * parser) +{ + int x; + x = (parser->data[parser->offset] << 8) | parser->data[parser->offset + 1]; + parser->offset += 2; + return x; +} + +#if 0 +static int +_parse_u24 (AmfParser * parser) +{ + int x; + x = (parser->data[parser->offset] << 16) | + (parser->data[parser->offset + 1] << 8) | parser->data[parser->offset + + 2]; + parser->offset += 3; + return x; +} + +static int +_parse_u32 (AmfParser * parser) +{ + int x; + x = (parser->data[parser->offset] << 24) | + (parser->data[parser->offset + 1] << 16) | + (parser->data[parser->offset + 2] << 8) | parser->data[parser->offset + + 3]; + parser->offset += 4; + return x; +} +#endif + +static int +_parse_double (AmfParser * parser) +{ + double d; + int i; + guint8 *d_ptr = (guint8 *) & d; + for (i = 0; i < 8; i++) { + d_ptr[i] = parser->data[parser->offset + (7 - i)]; + } + parser->offset += 8; + return d; +} + +static char * +_parse_utf8_string (AmfParser * parser) +{ + int size; + char *s; + + size = _parse_u16 (parser); + if (parser->offset + size >= parser->size) { + parser->error = TRUE; + return NULL; + } + s = g_strndup (parser->data + parser->offset, size); + parser->offset += size; + + return s; +} + +static void +_parse_object (AmfParser * parser, GstAmfNode * node) +{ + while (1) { + char *s; + GstAmfNode *child_node; + s = _parse_utf8_string (parser); + child_node = _parse_value (parser); + if (child_node->type == GST_AMF_TYPE_OBJECT_END) { + gst_amf_node_free (child_node); + break; + } + gst_amf_object_append_take (node, s, child_node); + } +} + +static GstAmfNode * +_parse_value (AmfParser * parser) +{ + GstAmfNode *node = NULL; + GstAmfType type; + + type = _parse_u8 (parser); + node = gst_amf_node_new (type); + + GST_DEBUG ("parsing type %d", type); + + switch (type) { + case GST_AMF_TYPE_NUMBER: + gst_amf_node_set_double (node, _parse_double (parser)); + break; + case GST_AMF_TYPE_BOOLEAN: + gst_amf_node_set_boolean (node, _parse_u8 (parser)); + break; + case GST_AMF_TYPE_STRING: + gst_amf_node_set_string_take (node, _parse_utf8_string (parser)); + break; + case GST_AMF_TYPE_OBJECT: + _parse_object (parser, node); + break; + case GST_AMF_TYPE_MOVIECLIP: + GST_ERROR ("unimplemented"); + break; + case GST_AMF_TYPE_NULL: + break; + case GST_AMF_TYPE_OBJECT_END: + break; + default: + GST_ERROR ("unimplemented"); + break; + } + + return node; +} + +GstAmfNode * +gst_amf_node_new_parse (const char *data, int size, int *n_bytes) +{ + AmfParser _p = { 0 }, *parser = &_p; + GstAmfNode *node; + + parser->data = data; + parser->size = size; + node = _parse_value (parser); + + if (n_bytes) + *n_bytes = parser->offset; + return node; +} + +void +gst_amf_node_set_boolean (GstAmfNode * node, gboolean val) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_BOOLEAN); + node->int_val = val; +} + +void +gst_amf_node_set_double (GstAmfNode * node, double val) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER); + node->double_val = val; +} + +void +gst_amf_node_set_string (GstAmfNode * node, const char *s) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_STRING); + node->string_val = g_strdup (s); +} + +void +gst_amf_node_set_string_take (GstAmfNode * node, char *s) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_STRING); + node->string_val = s; +} + +void +gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node) +{ + AmfObjectField *field; + + g_return_if_fail (node->type == GST_AMF_TYPE_OBJECT); + + field = g_malloc0 (sizeof (AmfObjectField)); + field->name = s; + field->value = child_node; + g_ptr_array_add (node->array_val, field); +} + +static void +amf_object_field_free (AmfObjectField * field) +{ + g_free (field->name); + gst_amf_node_free (field->value); + g_free (field); +} diff --git a/rtmp/amf.h b/rtmp/amf.h new file mode 100644 index 0000000..00b092a --- /dev/null +++ b/rtmp/amf.h @@ -0,0 +1,72 @@ +/* GStreamer RTMP Library + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_AMF_H_ +#define _GST_RTMP_AMF_H_ + +#include <glib.h> + +G_BEGIN_DECLS + +typedef enum { + GST_AMF_TYPE_NUMBER = 0, + GST_AMF_TYPE_BOOLEAN = 1, + GST_AMF_TYPE_STRING = 2, + GST_AMF_TYPE_OBJECT = 3, + GST_AMF_TYPE_MOVIECLIP = 4, + GST_AMF_TYPE_NULL = 5, + GST_AMF_TYPE_UNDEFINED = 6, + GST_AMF_TYPE_REFERENCE = 7, + GST_AMF_TYPE_ECMA_ARRAY = 8, + GST_AMF_TYPE_OBJECT_END = 9, + GST_AMF_TYPE_STRICT_ARRAY = 10, + GST_AMF_TYPE_DATE = 11, + GST_AMF_TYPE_LONG_STRING = 12, + GST_AMF_TYPE_UNSUPPORTED = 13, + GST_AMF_TYPE_RECORDSET = 14, + GST_AMF_TYPE_XML_DOCUMENT = 15, + GST_AMF_TYPE_TYPED_OBJECT = 16, + GST_AMF_TYPE_AVMPLUS_OBJECT = 17 +} GstAmfType; + + +struct _GstAmfNode { + GstAmfType type; + int int_val; + double double_val; + char *string_val; + GPtrArray *array_val; +}; +typedef struct _GstAmfNode GstAmfNode; + +GstAmfNode * gst_amf_node_new (GstAmfType type); +void gst_amf_node_free (GstAmfNode *node); + +GstAmfNode * gst_amf_node_new_parse (const char *data, int size, int *n_bytes); + +void gst_amf_node_set_boolean (GstAmfNode *node, gboolean val); +void gst_amf_node_set_double (GstAmfNode *node, double val); +void gst_amf_node_set_string (GstAmfNode *node, const char *s); +void gst_amf_node_set_string_take (GstAmfNode *node, char *s); +void gst_amf_object_append_take (GstAmfNode *node, char *s, GstAmfNode *child_node); + + +G_END_DECLS + +#endif diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c new file mode 100644 index 0000000..de80211 --- /dev/null +++ b/rtmp/rtmpchunk.c @@ -0,0 +1,121 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include "rtmpchunk.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_chunk_debug_category + +/* prototypes */ + + +static void gst_rtmp_chunk_set_property (GObject * object, + guint property_id, const GValue * value, GParamSpec * pspec); +static void gst_rtmp_chunk_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec); +static void gst_rtmp_chunk_dispose (GObject * object); +static void gst_rtmp_chunk_finalize (GObject * object); + + +enum +{ + PROP_0 +}; + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmpChunk, gst_rtmp_chunk, G_TYPE_OBJECT, + GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_debug_category, "rtmpchunk", 0, + "debug category for rtmpchunk element")); + +static void +gst_rtmp_chunk_class_init (GstRtmpChunkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->set_property = gst_rtmp_chunk_set_property; + gobject_class->get_property = gst_rtmp_chunk_get_property; + gobject_class->dispose = gst_rtmp_chunk_dispose; + gobject_class->finalize = gst_rtmp_chunk_finalize; + +} + +static void +gst_rtmp_chunk_init (GstRtmpChunk * rtmpchunk) +{ +} + +void +gst_rtmp_chunk_set_property (GObject * object, guint property_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object); + + GST_DEBUG_OBJECT (rtmpchunk, "set_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_chunk_get_property (GObject * object, guint property_id, + GValue * value, GParamSpec * pspec) +{ + GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object); + + GST_DEBUG_OBJECT (rtmpchunk, "get_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_chunk_dispose (GObject * object) +{ + GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object); + + GST_DEBUG_OBJECT (rtmpchunk, "dispose"); + + /* clean up as possible. may be called multiple times */ + + G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->dispose (object); +} + +void +gst_rtmp_chunk_finalize (GObject * object) +{ + GstRtmpChunk *rtmpchunk = GST_RTMP_CHUNK (object); + + GST_DEBUG_OBJECT (rtmpchunk, "finalize"); + + /* clean up object here */ + + G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->finalize (object); +} diff --git a/rtmp/rtmppacket.h b/rtmp/rtmpchunk.h index 2113dc6..5e799ee 100644 --- a/rtmp/rtmppacket.h +++ b/rtmp/rtmpchunk.h @@ -17,39 +17,40 @@ * Boston, MA 02110-1301, USA. */ -#ifndef _GST_RTMP_PACKET_H_ -#define _GST_RTMP_PACKET_H_ +#ifndef _GST_RTMP_CHUNK_H_ +#define _GST_RTMP_CHUNK_H_ G_BEGIN_DECLS -#define GST_TYPE_RTMP_PACKET (gst_rtmp_packet_get_type()) -#define GST_RTMP_PACKET(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_PACKET,GstRtmpPacket)) -#define GST_RTMP_PACKET_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_PACKET,GstRtmpPacketClass)) -#define GST_IS_RTMP_PACKET(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_PACKET)) -#define GST_IS_RTMP_PACKET_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_PACKET)) +#define GST_TYPE_RTMP_CHUNK (gst_rtmp_chunk_get_type()) +#define GST_RTMP_CHUNK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_CHUNK,GstRtmpChunk)) +#define GST_RTMP_CHUNK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_CHUNK,GstRtmpChunkClass)) +#define GST_IS_RTMP_CHUNK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_CHUNK)) +#define GST_IS_RTMP_CHUNK_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_CHUNK)) -typedef struct _GstRtmpPacket GstRtmpPacket; -typedef struct _GstRtmpPacketClass GstRtmpPacketClass; +typedef struct _GstRtmpChunk GstRtmpChunk; +typedef struct _GstRtmpChunkClass GstRtmpChunkClass; -struct _GstRtmpPacket +struct _GstRtmpChunk { GObject object; - char *request_data; - gsize request_length; - - char *response_data; - gsize response_length; + guint32 timestamp; + int type_id; + guint32 chunk_stream_id; + guint32 stream_id; + int length; + guint8 *data; }; -struct _GstRtmpPacketClass +struct _GstRtmpChunkClass { GObjectClass object_class; }; -GType gst_rtmp_packet_get_type (void); +GType gst_rtmp_chunk_get_type (void); G_END_DECLS diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 2fde5fe..9db1519 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -22,6 +22,8 @@ #endif #include <gst/gst.h> +#include <gio/gio.h> +#include <string.h> #include "rtmpclient.h" GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category); @@ -36,6 +38,21 @@ static void gst_rtmp_client_get_property (GObject * object, static void gst_rtmp_client_dispose (GObject * object); static void gst_rtmp_client_finalize (GObject * object); +static void +gst_rtmp_client_connect_start (GstRtmpClient * client, + GCancellable * cancellable, GSimpleAsyncResult * async); +static void +gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result, + gpointer user_data); +static void +gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result, + gpointer user_data); +static void +gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result, + gpointer user_data); +static void +gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result, + gpointer user_data); enum { @@ -120,3 +137,265 @@ gst_rtmp_client_finalize (GObject * object) G_OBJECT_CLASS (gst_rtmp_client_parent_class)->finalize (object); } + +/* API */ + +GstRtmpClient * +gst_rtmp_client_new (void) +{ + + return g_object_new (GST_TYPE_RTMP_CLIENT, NULL); + +} + +void +gst_rtmp_client_set_host (GstRtmpClient * client, const char *host) +{ + g_free (client->host); + client->host = g_strdup (host); +} + +void +gst_rtmp_client_set_stream (GstRtmpClient * client, const char *stream) +{ + g_free (client->stream); + client->stream = g_strdup (stream); +} + +void +gst_rtmp_client_connect_async (GstRtmpClient * client, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + + if (client->state != GST_RTMP_CLIENT_STATE_NEW) { + g_simple_async_report_error_in_idle (G_OBJECT (client), + callback, user_data, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "already connected"); + return; + } + + simple = g_simple_async_result_new (G_OBJECT (client), + callback, user_data, gst_rtmp_client_connect_async); + + gst_rtmp_client_connect_start (client, cancellable, simple); +} + +gboolean +gst_rtmp_client_connect_finish (GstRtmpClient * client, + GAsyncResult * result, GError ** error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (g_simple_async_result_is_valid (result, + G_OBJECT (client), gst_rtmp_client_connect_async), FALSE); + + simple = (GSimpleAsyncResult *) result; + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + return TRUE; +} + + +/* async implementation */ + +/* connect */ + +typedef struct _ConnectContext ConnectContext; +struct _ConnectContext +{ + GstRtmpClient *client; + GSimpleAsyncResult *async; + GCancellable *cancellable; + + GSocketClient *socket_client; + GSocketConnection *connection; + guint8 *handshake_send; + guint8 *handshake_recv; +}; + +static void +gst_rtmp_client_connect_complete (ConnectContext * context) +{ + GSimpleAsyncResult *async = context->async; + + if (context->socket_client) + g_object_unref (context->socket_client); + if (context->connection) + g_object_unref (context->connection); + g_free (context->handshake_send); + g_free (context->handshake_recv); + g_free (context); + g_simple_async_result_complete (async); +} + +static void +gst_rtmp_client_connect_start (GstRtmpClient * client, + GCancellable * cancellable, GSimpleAsyncResult * async) +{ + ConnectContext *context; + GSocketConnectable *addr; + + context = g_malloc0 (sizeof (ConnectContext)); + context->cancellable = cancellable; + context->client = client; + context->async = async; + + addr = g_network_address_new ("localhost", 1935); + context->socket_client = g_socket_client_new (); + + GST_ERROR ("g_socket_client_connect_async"); + g_socket_client_connect_async (context->socket_client, addr, + context->cancellable, gst_rtmp_client_connect_1, context); +} + + +static void +gst_rtmp_client_connect_1 (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + ConnectContext *context = user_data; + GError *error = NULL; + GOutputStream *os; + + GST_ERROR ("g_socket_client_connect_finish"); + context->connection = g_socket_client_connect_finish (context->socket_client, + result, &error); + if (context->connection == NULL) { + GST_ERROR ("error"); + g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); + g_error_free (error); + gst_rtmp_client_connect_complete (context); + return; + } + + context->handshake_send = g_malloc (1537); + context->handshake_send[0] = 3; + memset (context->handshake_send + 1, 0, 8); + memset (context->handshake_send + 9, 0xa5, 1528); + + os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection)); + + GST_ERROR ("g_output_stream_write_async"); + g_output_stream_write_async (os, context->handshake_send, 1537, + G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_2, + context); +} + +static void +gst_rtmp_client_connect_2 (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + ConnectContext *context = user_data; + GOutputStream *os; + GInputStream *is; + GError *error = NULL; + gssize n; + + GST_ERROR ("g_output_stream_write_finish"); + os = g_io_stream_get_output_stream (G_IO_STREAM (context->connection)); + n = g_output_stream_write_finish (os, result, &error); + GST_ERROR ("wrote %" G_GSIZE_FORMAT " bytes", n); + if (error) { + GST_ERROR ("error"); + g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); + g_error_free (error); + gst_rtmp_client_connect_complete (context); + return; + } + + context->handshake_recv = g_malloc (1537); + + GST_ERROR ("g_input_stream_read_async"); + is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); + g_input_stream_read_async (is, context->handshake_recv, 1537, + G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_3, + context); +} + +static void +gst_rtmp_client_connect_3 (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + ConnectContext *context = user_data; + GInputStream *is; + GError *error = NULL; + gssize n; + + GST_ERROR ("g_input_stream_read_finish"); + is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); + n = g_input_stream_read_finish (is, result, &error); + GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n); + if (error) { + GST_ERROR ("error"); + g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); + g_error_free (error); + gst_rtmp_client_connect_complete (context); + return; + } +#if 0 + GST_ERROR ("recv: %02x %02x %02x %02x %02x %02x %02x %02x %02x", + context->handshake_recv[0], context->handshake_recv[1], + context->handshake_recv[2], context->handshake_recv[3], + context->handshake_recv[4], context->handshake_recv[5], + context->handshake_recv[6], context->handshake_recv[7], + context->handshake_recv[8]); +#endif + + n = g_output_stream_write (g_io_stream_get_output_stream (G_IO_STREAM + (context->connection)), context->handshake_recv + 1, 1536, + context->cancellable, &error); + if (n < 1536) { + GST_ERROR ("error"); + g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); + g_error_free (error); + gst_rtmp_client_connect_complete (context); + return; + } + + GST_ERROR ("g_input_stream_read_async"); + is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); + g_input_stream_read_async (is, context->handshake_recv + 1, 1536, + G_PRIORITY_DEFAULT, context->cancellable, gst_rtmp_client_connect_4, + context); + +} + +static void +gst_rtmp_client_connect_4 (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + ConnectContext *context = user_data; + GInputStream *is; + GError *error = NULL; + gssize n; + + GST_ERROR ("g_input_stream_read_finish"); + is = g_io_stream_get_input_stream (G_IO_STREAM (context->connection)); + n = g_input_stream_read_finish (is, result, &error); + GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", n); + if (error) { + GST_ERROR ("error"); + g_simple_async_result_set_error (context->async, GST_RTMP_ERROR, + GST_RTMP_ERROR_TOO_LAZY, "%s", error->message); + g_error_free (error); + gst_rtmp_client_connect_complete (context); + return; + } + + context->client->socket_client = context->socket_client; + context->socket_client = NULL; + context->client->connection = context->connection; + context->connection = NULL; + context->client->state = GST_RTMP_CLIENT_STATE_CONNECTED; + + GST_ERROR ("got here"); + gst_rtmp_client_connect_complete (context); +} diff --git a/rtmp/rtmpclient.h b/rtmp/rtmpclient.h index d9f078f..8a1b8b1 100644 --- a/rtmp/rtmpclient.h +++ b/rtmp/rtmpclient.h @@ -20,7 +20,8 @@ #ifndef _GST_RTMP_CLIENT_H_ #define _GST_RTMP_CLIENT_H_ -#include <rtmp/rtmppacket.h> +#include <rtmp/rtmpmessage.h> +#include <rtmp/rtmpchunk.h> G_BEGIN_DECLS @@ -33,22 +34,42 @@ G_BEGIN_DECLS typedef struct _GstRtmpClient GstRtmpClient; typedef struct _GstRtmpClientClass GstRtmpClientClass; -typedef void (*GstRtmpClientCallback) (GstRtmpClient *client, - GstRtmpPacket *packet, gpointer user_data); +typedef void (*GstRtmpClientMessageCallback) (GstRtmpClient *client, + GstRtmpMessage *message, gpointer user_data); +typedef void (*GstRtmpClientChunkCallback) (GstRtmpClient *client, + GstRtmpChunk *chunk, gpointer user_data); + +#define GST_RTMP_ERROR g_quark_from_static_string ("GstRtmpError") + +enum { + GST_RTMP_ERROR_TOO_LAZY = 0 +}; + +typedef enum { + GST_RTMP_CLIENT_STATE_NEW, + GST_RTMP_CLIENT_STATE_CONNECTING, + GST_RTMP_CLIENT_STATE_CONNECTED, +} GstRtmpClientState; + struct _GstRtmpClient { GObject object; /* properties */ - char *server_host; - + char *host; + int port; + char *stream; /* private */ + GstRtmpClientState state; GMutex lock; GCond cond; GMainContext *context; + GSocketClient *socket_client; + GSocketConnection *connection; + }; struct _GstRtmpClientClass @@ -56,7 +77,8 @@ struct _GstRtmpClientClass GObjectClass object_class; /* signals */ - void (*got_packet) (GstRtmpClient *client, GstRtmpPacket *packet); + void (*got_chunk) (GstRtmpClient *client, GstRtmpChunk *chunk); + void (*got_message) (GstRtmpClient *client, GstRtmpMessage *message); }; @@ -65,8 +87,17 @@ GType gst_rtmp_client_get_type (void); GstRtmpClient *gst_rtmp_client_new (void); void gst_rtmp_client_set_url (GstRtmpClient *client, const char *url); -void gst_rtmp_client_queue_packet (GstRtmpClient *client, - GstRtmpPacket *packet, GstRtmpClientCallback callback, +void gst_rtmp_client_connect_async (GstRtmpClient *client, + GCancellable *cancellable, GAsyncReadyCallback callback, + gpointer user_data); +gboolean gst_rtmp_client_connect_finish (GstRtmpClient *client, + GAsyncResult *result, GError **error); + +void gst_rtmp_client_queue_message (GstRtmpClient *client, + GstRtmpMessage *message, GstRtmpClientMessageCallback callback, + gpointer user_data); +void gst_rtmp_client_queue_chunk (GstRtmpClient *client, + GstRtmpChunk *Chunk, GstRtmpClientChunkCallback callback, gpointer user_data); diff --git a/rtmp/rtmpmessage.c b/rtmp/rtmpmessage.c new file mode 100644 index 0000000..0ba1255 --- /dev/null +++ b/rtmp/rtmpmessage.c @@ -0,0 +1,121 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include "rtmpmessage.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_message_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_message_debug_category + +/* prototypes */ + + +static void gst_rtmp_message_set_property (GObject * object, + guint property_id, const GValue * value, GParamSpec * pspec); +static void gst_rtmp_message_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec); +static void gst_rtmp_message_dispose (GObject * object); +static void gst_rtmp_message_finalize (GObject * object); + + +enum +{ + PROP_0 +}; + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmpMessage, gst_rtmp_message, G_TYPE_OBJECT, + GST_DEBUG_CATEGORY_INIT (gst_rtmp_message_debug_category, "rtmpmessage", 0, + "debug category for rtmpmessage element")); + +static void +gst_rtmp_message_class_init (GstRtmpMessageClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->set_property = gst_rtmp_message_set_property; + gobject_class->get_property = gst_rtmp_message_get_property; + gobject_class->dispose = gst_rtmp_message_dispose; + gobject_class->finalize = gst_rtmp_message_finalize; + +} + +static void +gst_rtmp_message_init (GstRtmpMessage * rtmpmessage) +{ +} + +void +gst_rtmp_message_set_property (GObject * object, guint property_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object); + + GST_DEBUG_OBJECT (rtmpmessage, "set_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_message_get_property (GObject * object, guint property_id, + GValue * value, GParamSpec * pspec) +{ + GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object); + + GST_DEBUG_OBJECT (rtmpmessage, "get_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_message_dispose (GObject * object) +{ + GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object); + + GST_DEBUG_OBJECT (rtmpmessage, "dispose"); + + /* clean up as possible. may be called multiple times */ + + G_OBJECT_CLASS (gst_rtmp_message_parent_class)->dispose (object); +} + +void +gst_rtmp_message_finalize (GObject * object) +{ + GstRtmpMessage *rtmpmessage = GST_RTMP_MESSAGE (object); + + GST_DEBUG_OBJECT (rtmpmessage, "finalize"); + + /* clean up object here */ + + G_OBJECT_CLASS (gst_rtmp_message_parent_class)->finalize (object); +} diff --git a/rtmp/rtmpmessage.h b/rtmp/rtmpmessage.h new file mode 100644 index 0000000..053d1db --- /dev/null +++ b/rtmp/rtmpmessage.h @@ -0,0 +1,55 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_MESSAGE_H_ +#define _GST_RTMP_MESSAGE_H_ + + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP_MESSAGE (gst_rtmp_message_get_type()) +#define GST_RTMP_MESSAGE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_MESSAGE,GstRtmpMessage)) +#define GST_RTMP_MESSAGE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_MESSAGE,GstRtmpMessageClass)) +#define GST_IS_RTMP_MESSAGE(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_MESSAGE)) +#define GST_IS_RTMP_MESSAGE_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_MESSAGE)) + +typedef struct _GstRtmpMessage GstRtmpMessage; +typedef struct _GstRtmpMessageClass GstRtmpMessageClass; + +struct _GstRtmpMessage +{ + GObject object; + + int message_type; + guint32 payload_length; + guint32 timestamp; + guint32 stream_id; + +}; + +struct _GstRtmpMessageClass +{ + GObjectClass object_class; +}; + +GType gst_rtmp_message_get_type (void); + +G_END_DECLS + +#endif diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index 3c6bc88..0b089b4 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -23,6 +23,7 @@ #include <gst/gst.h> #include "rtmpserver.h" +#include "rtmpserverconnection.h" GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_debug_category); #define GST_CAT_DEFAULT gst_rtmp_server_debug_category @@ -35,6 +36,9 @@ static void gst_rtmp_server_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp_server_dispose (GObject * object); static void gst_rtmp_server_finalize (GObject * object); +static gboolean gst_rtmp_server_incoming (GSocketService * service, + GSocketConnection * connection, GObject * source_object, + gpointer user_data); enum @@ -63,6 +67,7 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass) static void gst_rtmp_server_init (GstRtmpServer * rtmpserver) { + rtmpserver->port = 11935; } void @@ -118,3 +123,53 @@ gst_rtmp_server_finalize (GObject * object) G_OBJECT_CLASS (gst_rtmp_server_parent_class)->finalize (object); } + +GstRtmpServer * +gst_rtmp_server_new (void) +{ + return g_object_new (GST_TYPE_RTMP_SERVER, NULL); +} + +void +gst_rtmp_server_start (GstRtmpServer * rtmpserver) +{ + gboolean ret; + GError *error = NULL; + + if (rtmpserver->socket_service) { + GST_ERROR ("rtmp server already started"); + return; + } + + rtmpserver->socket_service = g_socket_service_new (); + + ret = + g_socket_listener_add_inet_port (G_SOCKET_LISTENER (rtmpserver-> + socket_service), rtmpserver->port, NULL, &error); + if (!ret) { + GST_ERROR ("failed to add address: %s", error->message); + g_object_unref (rtmpserver->socket_service); + rtmpserver->socket_service = NULL; + return; + } + + g_signal_connect (rtmpserver->socket_service, "incoming", + G_CALLBACK (gst_rtmp_server_incoming), rtmpserver); +} + +static gboolean +gst_rtmp_server_incoming (GSocketService * service, + GSocketConnection * connection, GObject * source_object, gpointer user_data) +{ + GstRtmpServer *rtmpserver = GST_RTMP_SERVER (user_data); + GstRtmpServerConnection *server_connection; + + GST_ERROR ("client connected"); + + g_object_ref (connection); + server_connection = gst_rtmp_server_connection_new (connection); + (void) server_connection; + (void) rtmpserver; + + return TRUE; +} diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h index 0437dca..e1e0466 100644 --- a/rtmp/rtmpserver.h +++ b/rtmp/rtmpserver.h @@ -20,6 +20,7 @@ #ifndef _GST_RTMP_SERVER_H_ #define _GST_RTMP_SERVER_H_ +#include <gio/gio.h> G_BEGIN_DECLS @@ -36,6 +37,12 @@ struct _GstRtmpServer { GObject object; + /* properties */ + int port; + + /* private */ + GSocketService *socket_service; + }; struct _GstRtmpServerClass @@ -45,6 +52,9 @@ struct _GstRtmpServerClass GType gst_rtmp_server_get_type (void); +GstRtmpServer *gst_rtmp_server_new (void); +void gst_rtmp_server_start (GstRtmpServer * rtmpserver); + G_END_DECLS #endif diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c new file mode 100644 index 0000000..b3b08da --- /dev/null +++ b/rtmp/rtmpserverconnection.c @@ -0,0 +1,593 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include "rtmpserverconnection.h" +#include "amf.h" + +#include <string.h> + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_connection_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_server_connection_debug_category + +/* prototypes */ + +static void gst_rtmp_server_connection_set_property (GObject * object, + guint property_id, const GValue * value, GParamSpec * pspec); +static void gst_rtmp_server_connection_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec); +static void gst_rtmp_server_connection_dispose (GObject * object); +static void gst_rtmp_server_connection_finalize (GObject * object); + +static void proxy_connect (GstRtmpServerConnection * sc); +static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * + sc); + +enum +{ + PROP_0 +}; + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmpServerConnection, gst_rtmp_server_connection, + G_TYPE_OBJECT, + GST_DEBUG_CATEGORY_INIT (gst_rtmp_server_connection_debug_category, + "rtmpserverconnection", 0, + "debug category for GstRtmpServerConnection class")); + +static void +gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->set_property = gst_rtmp_server_connection_set_property; + gobject_class->get_property = gst_rtmp_server_connection_get_property; + gobject_class->dispose = gst_rtmp_server_connection_dispose; + gobject_class->finalize = gst_rtmp_server_connection_finalize; + +} + +static void +gst_rtmp_server_connection_init (GstRtmpServerConnection * rtmpserverconnection) +{ + rtmpserverconnection->cancellable = g_cancellable_new (); +} + +void +gst_rtmp_server_connection_set_property (GObject * object, guint property_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtmpServerConnection *rtmpserverconnection = + GST_RTMP_SERVER_CONNECTION (object); + + GST_DEBUG_OBJECT (rtmpserverconnection, "set_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_server_connection_get_property (GObject * object, guint property_id, + GValue * value, GParamSpec * pspec) +{ + GstRtmpServerConnection *rtmpserverconnection = + GST_RTMP_SERVER_CONNECTION (object); + + GST_DEBUG_OBJECT (rtmpserverconnection, "get_property"); + + switch (property_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +void +gst_rtmp_server_connection_dispose (GObject * object) +{ + GstRtmpServerConnection *rtmpserverconnection = + GST_RTMP_SERVER_CONNECTION (object); + + GST_DEBUG_OBJECT (rtmpserverconnection, "dispose"); + + /* clean up as possible. may be called multiple times */ + + G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->dispose (object); +} + +void +gst_rtmp_server_connection_finalize (GObject * object) +{ + GstRtmpServerConnection *rtmpserverconnection = + GST_RTMP_SERVER_CONNECTION (object); + + GST_DEBUG_OBJECT (rtmpserverconnection, "finalize"); + + /* clean up object here */ + + G_OBJECT_CLASS (gst_rtmp_server_connection_parent_class)->finalize (object); +} + +GstRtmpServerConnection * +gst_rtmp_server_connection_new (GSocketConnection * connection) +{ + GstRtmpServerConnection *sc; + + sc = g_object_new (GST_TYPE_RTMP_SERVER_CONNECTION, NULL); + sc->connection = connection; + + //proxy_connect (sc); + //gst_rtmp_server_connection_read_chunk (sc); + gst_rtmp_server_connection_handshake1 (sc); + + return sc; +} + +static void +gst_rtmp_server_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); + +typedef struct _ChunkRead ChunkRead; +struct _ChunkRead +{ + gpointer data; + gsize alloc_size; + gsize size; + GstRtmpServerConnection *server_connection; +}; + + +static void proxy_connect (GstRtmpServerConnection * sc); +static void proxy_connect_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * + sc); +static void gst_rtmp_server_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk); +static void proxy_write_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void proxy_read_chunk (GstRtmpServerConnection * sc); +static void proxy_read_done (GObject * obj, GAsyncResult * res, + gpointer user_data); +static void gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * + sc, ChunkRead * chunk); +static void gst_rtmp_server_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * + sc); +static void gst_rtmp_server_connection_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc, + GBytes * bytes); +static void gst_rtmp_server_connection_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data); +static void gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * + sc); +static void gst_rtmp_server_connection_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data); + + +static void +proxy_connect (GstRtmpServerConnection * sc) +{ + GSocketConnectable *addr; + + GST_ERROR ("proxy_connect"); + + addr = g_network_address_new ("localhost", 1935); + + sc->socket_client = g_socket_client_new (); + g_socket_client_connect_async (sc->socket_client, addr, + sc->cancellable, proxy_connect_done, sc); +} + +static void +proxy_connect_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); + GError *error = NULL; + + GST_ERROR ("proxy_connect_done"); + + sc->proxy_connection = g_socket_client_connect_finish (sc->socket_client, + res, &error); + if (sc->proxy_connection == NULL) { + GST_ERROR ("connection error: %s", error->message); + g_error_free (error); + return; + } + + gst_rtmp_server_connection_read_chunk (sc); +} + +static void +gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc) +{ + GInputStream *is; + ChunkRead *chunk; + + chunk = g_malloc0 (sizeof (ChunkRead)); + chunk->alloc_size = 4096; + chunk->data = g_malloc (chunk->alloc_size); + chunk->server_connection = sc; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_async (is, chunk->data, chunk->alloc_size, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_server_connection_read_chunk_done, chunk); +} + +static void +parse_message (guint8 * data, int size) +{ + int offset; + int bytes_read; + GstAmfNode *node; + + offset = 4; + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + + node = gst_amf_node_new_parse ((const char *) (data + offset), + size - offset, &bytes_read); + offset += bytes_read; + g_print ("bytes_read: %d\n", bytes_read); + if (node) + gst_amf_node_free (node); + +} + +typedef struct _Chunk Chunk; +struct _Chunk +{ + int stream_id; + int header_fmt; + guint32 timestamp; + int message_length; + int message_type_id; + guint32 message_stream_id; +}; + +static void +parse_chunk (guint8 * data, int size) +{ + Chunk _chunk, *chunk = &_chunk; + int offset; + + chunk->header_fmt = data[0] >> 6; + chunk->stream_id = data[0] & 0x3f; + offset = 1; + if (chunk->stream_id == 0) { + chunk->stream_id = 64 + data[1]; + offset = 2; + } else if (chunk->stream_id == 1) { + chunk->stream_id = 64 + data[1] + (data[2] << 8); + offset = 3; + } + chunk->timestamp = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + chunk->message_length = + (data[offset + 3] << 16) | (data[offset + 4] << 8) | data[offset + 5]; + chunk->message_type_id = data[offset + 6]; + offset += 7; + + g_print ("header_fmt: %d\n", chunk->header_fmt); + g_print ("stream_id: %d\n", chunk->stream_id); + g_print ("timestamp: %d\n", chunk->timestamp); + g_print ("message_length: %d\n", chunk->message_length); + g_print ("message_type_id: %d\n", chunk->message_type_id); + + parse_message (data + offset, size - offset); +} + +static void +dump_data (guint8 * data, int size) +{ + int i, j; + for (i = 0; i < size; i += 16) { + g_print ("%04x: ", i); + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%02x ", data[i + j]); + } else { + g_print (" "); + } + } + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.'); + } + } + g_print ("\n"); + } +} + +static void +gst_rtmp_server_connection_read_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("gst_rtmp_server_connection_read_chunk_done"); + + ret = g_input_stream_read_finish (is, res, &error); + if (ret < 0) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", ret); + chunk->size = ret; + + parse_chunk (chunk->data, chunk->size); + dump_data (chunk->data, chunk->size); + + if (0) + proxy_write_chunk (chunk->server_connection, chunk); +} + +static void +proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk) +{ + GOutputStream *os; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->proxy_connection)); + + g_output_stream_write_async (os, chunk->data, chunk->size, + G_PRIORITY_DEFAULT, sc->cancellable, proxy_write_done, chunk); +} + +static void +proxy_write_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("proxy_write_done"); + + ret = g_output_stream_write_finish (os, res, &error); + if (ret < 0) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("proxy write %" G_GSSIZE_FORMAT " bytes", ret); + + proxy_read_chunk (chunk->server_connection); + + g_free (chunk->data); + g_free (chunk); +} + +static void +proxy_read_chunk (GstRtmpServerConnection * sc) +{ + GInputStream *is; + ChunkRead *chunk; + + chunk = g_malloc0 (sizeof (ChunkRead)); + chunk->alloc_size = 4096; + chunk->data = g_malloc (chunk->alloc_size); + chunk->server_connection = sc; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->proxy_connection)); + + g_input_stream_read_async (is, chunk->data, chunk->alloc_size, + G_PRIORITY_DEFAULT, sc->cancellable, proxy_read_done, chunk); +} + +static void +proxy_read_done (GObject * obj, GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("proxy_read_done"); + + ret = g_input_stream_read_finish (is, res, &error); + if (ret < 0) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("proxy read %" G_GSSIZE_FORMAT " bytes", ret); + chunk->size = ret; + + gst_rtmp_server_connection_write_chunk (chunk->server_connection, chunk); +} + +static void +gst_rtmp_server_connection_write_chunk (GstRtmpServerConnection * sc, + ChunkRead * chunk) +{ + GOutputStream *os; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + g_output_stream_write_async (os, chunk->data, chunk->size, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_server_connection_write_chunk_done, chunk); +} + +static void +gst_rtmp_server_connection_write_chunk_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + ChunkRead *chunk = (ChunkRead *) user_data; + GError *error = NULL; + gssize ret; + + GST_ERROR ("gst_rtmp_server_connection_write_chunk_done"); + + ret = g_output_stream_write_finish (os, res, &error); + if (ret < 0) { + GST_ERROR ("write error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); + + gst_rtmp_server_connection_read_chunk (chunk->server_connection); + + g_free (chunk->data); + g_free (chunk); +} + +static void +gst_rtmp_server_connection_handshake1 (GstRtmpServerConnection * sc) +{ + GInputStream *is; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_bytes_async (is, 4096, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_server_connection_handshake1_done, sc); +} + +static void +gst_rtmp_server_connection_handshake1_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); + GError *error = NULL; + GBytes *bytes; + + GST_ERROR ("gst_rtmp_server_connection_handshake1_done"); + + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_server_connection_handshake2 (sc, bytes); +} + +static void +gst_rtmp_server_connection_handshake2 (GstRtmpServerConnection * sc, + GBytes * bytes) +{ + GOutputStream *os; + guint8 *data; + GBytes *out_bytes; + + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); + + data = g_malloc (1 + 1536 + 1536); + memcpy (data, g_bytes_get_data (bytes, NULL), 1 + 1536); + memset (data + 1537, 0, 8); + memset (data + 1537 + 8, 0xef, 1528); + + out_bytes = g_bytes_new_take (data, 1 + 1536 + 1536); + + g_output_stream_write_bytes_async (os, out_bytes, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_server_connection_handshake2_done, sc); +} + +static void +gst_rtmp_server_connection_handshake2_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); + GError *error = NULL; + gssize ret; + + GST_ERROR ("gst_rtmp_server_connection_handshake2_done"); + + ret = g_output_stream_write_bytes_finish (os, res, &error); + if (ret < 1 + 1536 + 1536) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("wrote %" G_GSSIZE_FORMAT " bytes", ret); + + gst_rtmp_server_connection_handshake3 (sc); +} + +static void +gst_rtmp_server_connection_handshake3 (GstRtmpServerConnection * sc) +{ + GInputStream *is; + + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + + g_input_stream_read_bytes_async (is, 1536, + G_PRIORITY_DEFAULT, sc->cancellable, + gst_rtmp_server_connection_handshake3_done, sc); +} + +static void +gst_rtmp_server_connection_handshake3_done (GObject * obj, + GAsyncResult * res, gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (obj); + GstRtmpServerConnection *sc = GST_RTMP_SERVER_CONNECTION (user_data); + GError *error = NULL; + GBytes *bytes; + + GST_ERROR ("gst_rtmp_server_connection_handshake3_done"); + + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { + GST_ERROR ("read error: %s", error->message); + g_error_free (error); + return; + } + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + gst_rtmp_server_connection_read_chunk (sc); + (void) &proxy_connect; +} diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h new file mode 100644 index 0000000..ab1f53c --- /dev/null +++ b/rtmp/rtmpserverconnection.h @@ -0,0 +1,60 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_SERVER_CONNECTION_H_ +#define _GST_RTMP_SERVER_CONNECTION_H_ + +#include <gio/gio.h> + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP_SERVER_CONNECTION (gst_rtmp_server_connection_get_type()) +#define GST_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnection)) +#define GST_RTMP_SERVER_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_SERVER_CONNECTION,GstRtmpServerConnectionClass)) +#define GST_IS_RTMP_SERVER_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_SERVER_CONNECTION)) +#define GST_IS_RTMP_SERVER_CONNECTION_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_SERVER_CONNECTION)) + +typedef struct _GstRtmpServerConnection GstRtmpServerConnection; +typedef struct _GstRtmpServerConnectionClass GstRtmpServerConnectionClass; + +struct _GstRtmpServerConnection +{ + GObject object; + + /* private */ + GSocketConnection *connection; + GSocketConnection *proxy_connection; + GCancellable *cancellable; + int state; + GSocketClient *socket_client; +}; + +struct _GstRtmpServerConnectionClass +{ + GObjectClass object_class; +}; + +GType gst_rtmp_server_connection_get_type (void); + +GstRtmpServerConnection *gst_rtmp_server_connection_new ( + GSocketConnection *connection); + +G_END_DECLS + +#endif diff --git a/rtmp/rtmppacket.c b/rtmp/rtmpstream.c index de342c5..961e09c 100644 --- a/rtmp/rtmppacket.c +++ b/rtmp/rtmpstream.c @@ -22,20 +22,20 @@ #endif #include <gst/gst.h> -#include "rtmppacket.h" +#include "rtmpstream.h" -GST_DEBUG_CATEGORY_STATIC (gst_rtmp_packet_debug_category); -#define GST_CAT_DEFAULT gst_rtmp_packet_debug_category +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_stream_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_stream_debug_category /* prototypes */ -static void gst_rtmp_packet_set_property (GObject * object, +static void gst_rtmp_stream_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec); -static void gst_rtmp_packet_get_property (GObject * object, +static void gst_rtmp_stream_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); -static void gst_rtmp_packet_dispose (GObject * object); -static void gst_rtmp_packet_finalize (GObject * object); +static void gst_rtmp_stream_dispose (GObject * object); +static void gst_rtmp_stream_finalize (GObject * object); enum @@ -45,34 +45,34 @@ enum /* class initialization */ -G_DEFINE_TYPE_WITH_CODE (GstRtmpPacket, gst_rtmp_packet, G_TYPE_OBJECT, - GST_DEBUG_CATEGORY_INIT (gst_rtmp_packet_debug_category, "rtmppacket", 0, - "debug category for rtmppacket element")); +G_DEFINE_TYPE_WITH_CODE (GstRtmpStream, gst_rtmp_stream, G_TYPE_OBJECT, + GST_DEBUG_CATEGORY_INIT (gst_rtmp_stream_debug_category, "rtmpstream", 0, + "debug category for rtmpstream element")); static void -gst_rtmp_packet_class_init (GstRtmpPacketClass * klass) +gst_rtmp_stream_class_init (GstRtmpStreamClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - gobject_class->set_property = gst_rtmp_packet_set_property; - gobject_class->get_property = gst_rtmp_packet_get_property; - gobject_class->dispose = gst_rtmp_packet_dispose; - gobject_class->finalize = gst_rtmp_packet_finalize; + gobject_class->set_property = gst_rtmp_stream_set_property; + gobject_class->get_property = gst_rtmp_stream_get_property; + gobject_class->dispose = gst_rtmp_stream_dispose; + gobject_class->finalize = gst_rtmp_stream_finalize; } static void -gst_rtmp_packet_init (GstRtmpPacket * rtmppacket) +gst_rtmp_stream_init (GstRtmpStream * rtmpstream) { } void -gst_rtmp_packet_set_property (GObject * object, guint property_id, +gst_rtmp_stream_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec) { - GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object); + GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object); - GST_DEBUG_OBJECT (rtmppacket, "set_property"); + GST_DEBUG_OBJECT (rtmpstream, "set_property"); switch (property_id) { default: @@ -82,12 +82,12 @@ gst_rtmp_packet_set_property (GObject * object, guint property_id, } void -gst_rtmp_packet_get_property (GObject * object, guint property_id, +gst_rtmp_stream_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec) { - GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object); + GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object); - GST_DEBUG_OBJECT (rtmppacket, "get_property"); + GST_DEBUG_OBJECT (rtmpstream, "get_property"); switch (property_id) { default: @@ -97,25 +97,25 @@ gst_rtmp_packet_get_property (GObject * object, guint property_id, } void -gst_rtmp_packet_dispose (GObject * object) +gst_rtmp_stream_dispose (GObject * object) { - GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object); + GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object); - GST_DEBUG_OBJECT (rtmppacket, "dispose"); + GST_DEBUG_OBJECT (rtmpstream, "dispose"); /* clean up as possible. may be called multiple times */ - G_OBJECT_CLASS (gst_rtmp_packet_parent_class)->dispose (object); + G_OBJECT_CLASS (gst_rtmp_stream_parent_class)->dispose (object); } void -gst_rtmp_packet_finalize (GObject * object) +gst_rtmp_stream_finalize (GObject * object) { - GstRtmpPacket *rtmppacket = GST_RTMP_PACKET (object); + GstRtmpStream *rtmpstream = GST_RTMP_STREAM (object); - GST_DEBUG_OBJECT (rtmppacket, "finalize"); + GST_DEBUG_OBJECT (rtmpstream, "finalize"); /* clean up object here */ - G_OBJECT_CLASS (gst_rtmp_packet_parent_class)->finalize (object); + G_OBJECT_CLASS (gst_rtmp_stream_parent_class)->finalize (object); } diff --git a/rtmp/rtmpstream.h b/rtmp/rtmpstream.h new file mode 100644 index 0000000..00dc71e --- /dev/null +++ b/rtmp/rtmpstream.h @@ -0,0 +1,52 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_STREAM_H_ +#define _GST_RTMP_STREAM_H_ + + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP_STREAM (gst_rtmp_stream_get_type()) +#define GST_RTMP_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_STREAM,GstRtmpStream)) +#define GST_RTMP_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTMP_STREAM,GstRtmpStreamClass)) +#define GST_IS_RTMP_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_STREAM)) +#define GST_IS_RTMP_STREAM_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTMP_STREAM)) + +typedef struct _GstRtmpStream GstRtmpStream; +typedef struct _GstRtmpStreamClass GstRtmpStreamClass; + +struct _GstRtmpStream +{ + GObject object; + + int stream_id; + +}; + +struct _GstRtmpStreamClass +{ + GObjectClass object_class; +}; + +GType gst_rtmp_stream_get_type (void); + +G_END_DECLS + +#endif diff --git a/tools/Makefile.am b/tools/Makefile.am index e69de29..e81c2df 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -0,0 +1,12 @@ + + +noinst_PROGRAMS = client-test proxy-server + +client_test_SOURCES = client-test.c +client_test_CFLAGS = $(GST_RTMP_CFLAGS) $(GST_CFLAGS) -I../rtmp +client_test_LDADD = ../rtmp/libgstrtmp-1.0.la $(GST_LIBS) + +proxy_server_SOURCES = proxy-server.c +proxy_server_CFLAGS = $(GST_RTMP_CFLAGS) $(GST_CFLAGS) -I../rtmp +proxy_server_LDADD = ../rtmp/libgstrtmp-1.0.la $(GST_LIBS) + diff --git a/tools/client-test.c b/tools/client-test.c new file mode 100644 index 0000000..8a0e9f4 --- /dev/null +++ b/tools/client-test.c @@ -0,0 +1,88 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <gio/gio.h> +#include <stdlib.h> +#include "rtmpclient.h" + +#define GETTEXT_PACKAGE NULL + +gboolean verbose; + +static GOptionEntry entries[] = { + {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL}, + {NULL} +}; + +static void +connect_done (GObject *source, GAsyncResult *result, gpointer user_data); + +int +main (int argc, char *argv[]) +{ + GError *error = NULL; + GOptionContext *context; + GMainLoop *main_loop; + GstRtmpClient *client; + GCancellable *cancellable; + + context = g_option_context_new ("- FIXME"); + g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE); + g_option_context_add_group (context, gst_init_get_option_group ()); + if (!g_option_context_parse (context, &argc, &argv, &error)) { + g_print ("option parsing failed: %s\n", error->message); + exit (1); + } + g_option_context_free (context); + + + client = gst_rtmp_client_new (); + cancellable = g_cancellable_new (); + + main_loop = g_main_loop_new (NULL, TRUE); + + gst_rtmp_client_connect_async (client, cancellable, connect_done, + client); + + g_main_loop_run (main_loop); + + exit (0); +} + +static void +connect_done (GObject *source, GAsyncResult *result, gpointer user_data) +{ + GstRtmpClient *client = user_data; + GError *error = NULL; + gboolean ret; + + ret = gst_rtmp_client_connect_finish (client, result, &error); + if (!ret) { + GST_ERROR("error: %s", error->message); + g_error_free (error); + } + + GST_ERROR("got here"); +} + diff --git a/tools/proxy-server.c b/tools/proxy-server.c new file mode 100644 index 0000000..3920d5f --- /dev/null +++ b/tools/proxy-server.c @@ -0,0 +1,64 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <gio/gio.h> +#include <stdlib.h> +#include "rtmpserver.h" + +#define GETTEXT_PACKAGE NULL + +gboolean verbose; + +static GOptionEntry entries[] = { + {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL}, + {NULL} +}; + + +int +main (int argc, char *argv[]) +{ + GError *error = NULL; + GOptionContext *context; + GMainLoop *main_loop; + GstRtmpServer *server; + + context = g_option_context_new ("- FIXME"); + g_option_context_add_main_entries (context, entries, GETTEXT_PACKAGE); + g_option_context_add_group (context, gst_init_get_option_group ()); + if (!g_option_context_parse (context, &argc, &argv, &error)) { + g_print ("option parsing failed: %s\n", error->message); + exit (1); + } + g_option_context_free (context); + + server = gst_rtmp_server_new (); + gst_rtmp_server_start (server); + + main_loop = g_main_loop_new (NULL, TRUE); + g_main_loop_run (main_loop); + + exit (0); +} + |