summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-30 13:17:13 -0700
committerDavid Schleef <ds@schleef.org>2014-08-30 13:17:13 -0700
commitcff0d57190f3f0b9c9d0c30012b959f12055da08 (patch)
tree2eadf183af3c3d6b1062eebd3f06053be16e0378
parentbf029aebe192f30de5d79281d0b8c44f94cfd233 (diff)
hacking
-rw-r--r--configure.ac2
-rw-r--r--plugins/Makefile.am6
-rw-r--r--plugins/gstrtmp2.c8
-rw-r--r--plugins/gstrtmp2sink.c5
-rw-r--r--plugins/gstrtmp2src.c372
-rw-r--r--plugins/gstrtmp2src.h20
-rw-r--r--rtmp/amf.c81
-rw-r--r--rtmp/amf.h10
-rw-r--r--rtmp/rtmpchunk.c63
-rw-r--r--rtmp/rtmpchunk.h5
-rw-r--r--rtmp/rtmpclient.c1
-rw-r--r--rtmp/rtmpconnection.c144
-rw-r--r--rtmp/rtmpconnection.h21
-rw-r--r--tools/client-test.c150
-rw-r--r--tools/proxy-server.c4
15 files changed, 755 insertions, 137 deletions
diff --git a/configure.ac b/configure.ac
index 357f186..3190a98 100644
--- a/configure.ac
+++ b/configure.ac
@@ -131,7 +131,7 @@ AM_CONDITIONAL(HAVE_FALSE, false)
GST_RTMP_CFLAGS="$GST_RTMP_CFLAGS -I\$(top_srcdir)"
AC_SUBST(GST_RTMP_CFLAGS)
-GST_RTMP_LIBS="\$(top_builddir)/gst-rtmp/libgss-$GST_API_VERSION.la"
+GST_RTMP_LIBS="\$(top_builddir)/rtmp/libgstrtmp-$GST_API_VERSION.la"
AC_SUBST(GST_RTMP_LIBS)
diff --git a/plugins/Makefile.am b/plugins/Makefile.am
index 43990ca..885da90 100644
--- a/plugins/Makefile.am
+++ b/plugins/Makefile.am
@@ -1,13 +1,13 @@
-plugindir = ${prefix}/gstreamer-1.0
+plugindir = ${libdir}/gstreamer-1.0
plugin_LTLIBRARIES = libgstrtmp2.la
libgstrtmp2_la_SOURCES = gstrtmp2src.c gstrtmp2sink.c gstrtmp2.c
noinst_HEADERS = gstrtmp2src.h gstrtmp2sink.h
-libgstrtmp2_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(RTMP_CFLAGS)
-libgstrtmp2_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(RTMP_LIBS) $(WINSOCK2_LIBS)
+libgstrtmp2_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GST_RTMP_CFLAGS)
+libgstrtmp2_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(GST_RTMP_LIBS) $(WINSOCK2_LIBS)
libgstrtmp2_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstrtmp2_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
diff --git a/plugins/gstrtmp2.c b/plugins/gstrtmp2.c
index dc82302..38da527 100644
--- a/plugins/gstrtmp2.c
+++ b/plugins/gstrtmp2.c
@@ -25,14 +25,12 @@
#include "gstrtmp2src.h"
#include "gstrtmp2sink.h"
-GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
-#define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
-
static gboolean
plugin_init (GstPlugin * plugin)
{
- gst_element_register (plugin, "rtmp2src", GST_RANK_NONE, GST_TYPE_RTMP2_SRC);
- gst_element_register (plugin, "rtmp2sink", GST_RANK_NONE,
+ gst_element_register (plugin, "rtmp2src", GST_RANK_PRIMARY + 1,
+ GST_TYPE_RTMP2_SRC);
+ gst_element_register (plugin, "rtmp2sink", GST_RANK_PRIMARY + 1,
GST_TYPE_RTMP2_SINK);
return TRUE;
diff --git a/plugins/gstrtmp2sink.c b/plugins/gstrtmp2sink.c
index eacb268..b6d4713 100644
--- a/plugins/gstrtmp2sink.c
+++ b/plugins/gstrtmp2sink.c
@@ -19,7 +19,8 @@
/**
* SECTION:element-gstrtmp2sink
*
- * The rtmp2sink element does FIXME stuff.
+ * The rtmp2sink element sends audio and video streams to an RTMP
+ * server.
*
* <refsect2>
* <title>Example launch line</title>
@@ -90,7 +91,7 @@ static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
- GST_STATIC_CAPS ("application/unknown")
+ GST_STATIC_CAPS ("video/x-flv")
);
diff --git a/plugins/gstrtmp2src.c b/plugins/gstrtmp2src.c
index 6b281c2..bca8ba4 100644
--- a/plugins/gstrtmp2src.c
+++ b/plugins/gstrtmp2src.c
@@ -19,12 +19,12 @@
/**
* SECTION:element-gstrtmp2src
*
- * The rtmp2src element does FIXME stuff.
+ * The rtmp2src element receives input streams from an RTMP server.
*
* <refsect2>
* <title>Example launch line</title>
* |[
- * gst-launch -v fakesrc ! rtmp2src ! FIXME ! fakesink
+ * gst-launch -v rtmp2src ! decodebin ! fakesink
* ]|
* FIXME Describe what the pipeline does.
* </refsect2>
@@ -36,7 +36,10 @@
#include <gst/gst.h>
#include <gst/base/gstbasesrc.h>
+#include <gst/base/gstpushsrc.h>
#include "gstrtmp2src.h"
+#include <rtmp/rtmpchunk.h>
+#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
#define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
@@ -50,13 +53,10 @@ static void gst_rtmp2_src_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp2_src_dispose (GObject * object);
static void gst_rtmp2_src_finalize (GObject * object);
+static void gst_rtmp2_src_uri_handler_init (gpointer g_iface,
+ gpointer iface_data);
-static GstCaps *gst_rtmp2_src_get_caps (GstBaseSrc * src, GstCaps * filter);
-static gboolean gst_rtmp2_src_negotiate (GstBaseSrc * src);
-static GstCaps *gst_rtmp2_src_fixate (GstBaseSrc * src, GstCaps * caps);
-static gboolean gst_rtmp2_src_set_caps (GstBaseSrc * src, GstCaps * caps);
-static gboolean gst_rtmp2_src_decide_allocation (GstBaseSrc * src,
- GstQuery * query);
+static void gst_rtmp2_src_task (gpointer user_data);
static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
static void gst_rtmp2_src_get_times (GstBaseSrc * src, GstBuffer * buffer,
@@ -74,8 +74,25 @@ static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buf);
static GstFlowReturn gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buf);
-static GstFlowReturn gst_rtmp2_src_fill (GstBaseSrc * src, guint64 offset,
- guint size, GstBuffer * buf);
+
+static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data);
+static void connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void send_connect (GstRtmp2Src * src);
+static void cmd_connect_done (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk, const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args,
+ gpointer user_data);
+static void send_create_stream (GstRtmp2Src * src);
+static void create_stream_done (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk, const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args,
+ gpointer user_data);
+static void send_play (GstRtmp2Src * src);
+static void play_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data);
enum
{
@@ -88,15 +105,18 @@ static GstStaticPadTemplate gst_rtmp2_src_src_template =
GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
- GST_STATIC_CAPS ("application/unknown")
+ GST_STATIC_CAPS ("video/x-flv")
);
/* class initialization */
-G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_BASE_SRC,
- GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
- "debug category for rtmp2src element"));
+G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
+ do {
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
+ gst_rtmp2_src_uri_handler_init);
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
+ "debug category for rtmp2src element");} while (0));
static void
gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
@@ -110,19 +130,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
gst_static_pad_template_get (&gst_rtmp2_src_src_template));
gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
- "FIXME Long name", "Generic", "FIXME Description",
- "FIXME <fixme@example.com>");
+ "RTMP source element", "Source", "Source element for RTMP streams",
+ "David Schleef <ds@schleef.org>");
gobject_class->set_property = gst_rtmp2_src_set_property;
gobject_class->get_property = gst_rtmp2_src_get_property;
gobject_class->dispose = gst_rtmp2_src_dispose;
gobject_class->finalize = gst_rtmp2_src_finalize;
- base_src_class->get_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_src_get_caps);
- base_src_class->negotiate = GST_DEBUG_FUNCPTR (gst_rtmp2_src_negotiate);
- base_src_class->fixate = GST_DEBUG_FUNCPTR (gst_rtmp2_src_fixate);
- base_src_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_src_set_caps);
- base_src_class->decide_allocation =
- GST_DEBUG_FUNCPTR (gst_rtmp2_src_decide_allocation);
base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start);
base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop);
base_src_class->get_times = GST_DEBUG_FUNCPTR (gst_rtmp2_src_get_times);
@@ -133,17 +147,70 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
base_src_class->do_seek = GST_DEBUG_FUNCPTR (gst_rtmp2_src_do_seek);
base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock);
base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop);
- base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query);
+ if (0)
+ base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query);
base_src_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_src_event);
base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create);
base_src_class->alloc = GST_DEBUG_FUNCPTR (gst_rtmp2_src_alloc);
- base_src_class->fill = GST_DEBUG_FUNCPTR (gst_rtmp2_src_fill);
}
static void
gst_rtmp2_src_init (GstRtmp2Src * rtmp2src)
{
+ g_mutex_init (&rtmp2src->lock);
+ rtmp2src->queue = g_queue_new ();
+
+ //gst_base_src_set_live (GST_BASE_SRC(rtmp2src), TRUE);
+
+ rtmp2src->task = gst_task_new (gst_rtmp2_src_task, rtmp2src, NULL);
+ g_rec_mutex_init (&rtmp2src->task_lock);
+ gst_task_set_lock (rtmp2src->task, &rtmp2src->task_lock);
+ rtmp2src->client = gst_rtmp_client_new ();
+ gst_rtmp_client_set_server_address (rtmp2src->client,
+ "ec2-54-185-55-241.us-west-2.compute.amazonaws.com");
+
+}
+
+static GstURIType
+gst_rtmp2_src_uri_get_type (GType type)
+{
+ return GST_URI_SRC;
+}
+
+static const gchar *const *
+gst_rtmp2_src_uri_get_protocols (GType type)
+{
+ static const gchar *protocols[] = { "rtmp", NULL };
+
+ return protocols;
+}
+
+static gchar *
+gst_rtmp2_src_uri_get_uri (GstURIHandler * handler)
+{
+ GstRtmp2Src *src = GST_RTMP2_SRC (handler);
+
+ /* FIXME: make thread-safe */
+ return g_strdup (src->uri);
+}
+
+static gboolean
+gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
+ GError ** error)
+{
+ return TRUE;
+}
+
+static void
+gst_rtmp2_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+ iface->get_type = gst_rtmp2_src_uri_get_type;
+ iface->get_protocols = gst_rtmp2_src_uri_get_protocols;
+ iface->get_uri = gst_rtmp2_src_uri_get_uri;
+ iface->set_uri = gst_rtmp2_src_uri_set_uri;
}
void
@@ -196,74 +263,227 @@ gst_rtmp2_src_finalize (GObject * object)
GST_DEBUG_OBJECT (rtmp2src, "finalize");
/* clean up object here */
+ g_object_unref (rtmp2src->task);
+ g_rec_mutex_clear (&rtmp2src->task_lock);
+ g_object_unref (rtmp2src->client);
+ g_mutex_clear (&rtmp2src->lock);
+ g_queue_free_full (rtmp2src->queue, g_object_unref);
G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
}
-/* get caps from subclass */
-static GstCaps *
-gst_rtmp2_src_get_caps (GstBaseSrc * src, GstCaps * filter)
+/* start and stop processing, ideal for opening/closing the resource */
+static gboolean
+gst_rtmp2_src_start (GstBaseSrc * src)
{
GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
- GST_DEBUG_OBJECT (rtmp2src, "get_caps");
+ GST_DEBUG_OBJECT (rtmp2src, "start");
- return NULL;
+ gst_task_start (rtmp2src->task);
+
+ return TRUE;
}
-/* decide on caps */
-static gboolean
-gst_rtmp2_src_negotiate (GstBaseSrc * src)
+static void
+gst_rtmp2_src_task (gpointer user_data)
{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ GMainLoop *main_loop;
+ GMainContext *main_context;
+
+ gst_rtmp_client_connect_async (rtmp2src->client, NULL, connect_done,
+ rtmp2src);
+
+ main_context = g_main_context_new ();
+ main_loop = g_main_loop_new (main_context, TRUE);
+ g_main_loop_run (main_loop);
+ g_main_loop_unref (main_loop);
+ g_main_context_unref (main_context);
+}
- GST_DEBUG_OBJECT (rtmp2src, "negotiate");
+static void
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
+{
+ GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ GError *error = NULL;
+ gboolean ret;
+
+ ret = gst_rtmp_client_connect_finish (rtmp2src->client, result, &error);
+ if (!ret) {
+ GST_ERROR ("error: %s", error->message);
+ g_error_free (error);
+ return;
+ }
- return TRUE;
+ rtmp2src->connection = gst_rtmp_client_get_connection (rtmp2src->client);
+ g_signal_connect (rtmp2src->connection, "got-chunk", G_CALLBACK (got_chunk),
+ rtmp2src);
+
+ send_connect (rtmp2src);
}
-/* called if, in negotiation, caps need fixating */
-static GstCaps *
-gst_rtmp2_src_fixate (GstBaseSrc * src, GstCaps * caps)
+static void
+dump_command (GstRtmpChunk * chunk)
{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstAmfNode *amf;
+ gsize size;
+ const guint8 *data;
+ gsize n_parsed;
+ int offset;
+
+ offset = 0;
+ data = g_bytes_get_data (chunk->payload, &size);
+ while (offset < size) {
+ amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ gst_amf_node_dump (amf);
+ gst_amf_node_free (amf);
+ offset += n_parsed;
+ }
+}
+
+static void
+dump_chunk (GstRtmpChunk * chunk, gboolean dir)
+{
+ g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT
+ " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<",
+ chunk->stream_id,
+ chunk->timestamp,
+ chunk->message_length, chunk->message_type_id, chunk->info);
+ if (chunk->message_type_id == 20) {
+ dump_command (chunk);
+ }
+ if (chunk->message_type_id == 18) {
+ dump_command (chunk);
+ }
+ gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));
+}
- GST_DEBUG_OBJECT (rtmp2src, "fixate");
+static void
+got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data)
+{
+ GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ if (rtmp2src->dump) {
+ dump_chunk (chunk, FALSE);
+ }
+}
- return NULL;
+static void
+send_connect (GstRtmp2Src * rtmp2src)
+{
+ GstAmfNode *node;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_OBJECT);
+ gst_amf_object_set_string (node, "app", "live");
+ gst_amf_object_set_string (node, "type", "nonprivate");
+ gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live");
+ // "fpad": False,
+ // "capabilities": 15,
+ // "audioCodecs": 3191,
+ // "videoCodecs": 252,
+ // "videoFunction": 1,
+ gst_rtmp_connection_send_command (rtmp2src->connection, 3, "connect", 1, node,
+ NULL, cmd_connect_done, rtmp2src);
}
-/* notify the subclass of new caps */
-static gboolean
-gst_rtmp2_src_set_caps (GstBaseSrc * src, GstCaps * caps)
+static void
+cmd_connect_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data)
{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ gboolean ret;
+
+ ret = FALSE;
+ if (optional_args) {
+ const GstAmfNode *n;
+ n = gst_amf_node_get_object (optional_args, "code");
+ if (n) {
+ const char *s;
+ s = gst_amf_node_get_string (n);
+ if (strcmp (s, "NetConnection.Connect.Success") == 0) {
+ ret = TRUE;
+ }
+ }
+ }
- GST_DEBUG_OBJECT (rtmp2src, "set_caps");
+ if (ret) {
+ GST_ERROR ("success");
- return TRUE;
+ send_create_stream (rtmp2src);
+ }
}
-/* setup allocation query */
-static gboolean
-gst_rtmp2_src_decide_allocation (GstBaseSrc * src, GstQuery * query)
+static void
+send_create_stream (GstRtmp2Src * rtmp2src)
{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstAmfNode *node;
- GST_DEBUG_OBJECT (rtmp2src, "decide_allocation");
+ node = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ gst_rtmp_connection_send_command (rtmp2src->connection, 3, "createStream", 2,
+ node, NULL, create_stream_done, rtmp2src);
- return TRUE;
}
-/* start and stop processing, ideal for opening/closing the resource */
-static gboolean
-gst_rtmp2_src_start (GstBaseSrc * src)
+static void
+create_stream_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data)
{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ gboolean ret;
+ int stream_id;
+
+ ret = FALSE;
+ if (optional_args) {
+ stream_id = gst_amf_node_get_number (optional_args);
+ ret = TRUE;
+ }
- GST_DEBUG_OBJECT (rtmp2src, "start");
+ if (ret) {
+ GST_ERROR ("createStream success, stream_id=%d", stream_id);
- return TRUE;
+ send_play (rtmp2src);
+ }
+}
+
+static void
+send_play (GstRtmp2Src * rtmp2src)
+{
+ GstAmfNode *n1;
+ GstAmfNode *n2;
+ GstAmfNode *n3;
+
+ n1 = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ n2 = gst_amf_node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_set_string (n2, "myStream");
+ n3 = gst_amf_node_new (GST_AMF_TYPE_NUMBER);
+ gst_amf_node_set_number (n3, 0);
+ gst_rtmp_connection_send_command2 (rtmp2src->connection, 8, 1, "play", 3, n1,
+ n2, n3, NULL, play_done, rtmp2src);
+
+}
+
+static void
+play_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data)
+{
+ //GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data);
+ gboolean ret;
+ int stream_id;
+
+ ret = FALSE;
+ if (optional_args) {
+ stream_id = gst_amf_node_get_number (optional_args);
+ ret = TRUE;
+ }
+
+ if (ret) {
+ GST_ERROR ("play success, stream_id=%d", stream_id);
+
+ }
}
static gboolean
@@ -307,7 +527,7 @@ gst_rtmp2_src_is_seekable (GstBaseSrc * src)
GST_DEBUG_OBJECT (rtmp2src, "is_seekable");
- return TRUE;
+ return FALSE;
}
/* Prepare the segment on which to perform do_seek(), converting to the
@@ -386,9 +606,29 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer ** buf)
{
GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
+ GstRtmpChunk *chunk;
+ const char *data;
+ gsize payload_size;
GST_DEBUG_OBJECT (rtmp2src, "create");
+ g_mutex_lock (&rtmp2src->lock);
+ chunk = g_queue_pop_head (rtmp2src->queue);
+ while (!chunk) {
+ if (rtmp2src->reset) {
+ g_mutex_unlock (&rtmp2src->lock);
+ return GST_FLOW_ERROR;
+ }
+ g_cond_wait (&rtmp2src->cond, &rtmp2src->lock);
+ chunk = g_queue_pop_head (rtmp2src->queue);
+ }
+ g_mutex_unlock (&rtmp2src->lock);
+
+ data = g_bytes_get_data (chunk->payload, &payload_size);
+ *buf =
+ gst_buffer_new_wrapped_full (0, (gpointer) data, payload_size, 0,
+ payload_size, chunk, g_object_unref);
+
return GST_FLOW_OK;
}
@@ -404,15 +644,3 @@ gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size,
return GST_FLOW_OK;
}
-
-/* ask the subclass to fill the buffer with data from offset and size */
-static GstFlowReturn
-gst_rtmp2_src_fill (GstBaseSrc * src, guint64 offset, guint size,
- GstBuffer * buf)
-{
- GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src);
-
- GST_DEBUG_OBJECT (rtmp2src, "fill");
-
- return GST_FLOW_OK;
-}
diff --git a/plugins/gstrtmp2src.h b/plugins/gstrtmp2src.h
index 59d7511..6529a6e 100644
--- a/plugins/gstrtmp2src.h
+++ b/plugins/gstrtmp2src.h
@@ -20,7 +20,9 @@
#ifndef _GST_RTMP2_SRC_H_
#define _GST_RTMP2_SRC_H_
-#include <gst/base/gstbasesrc.h>
+#include <gst/base/gstpushsrc.h>
+#include <rtmp/rtmpclient.h>
+#include <rtmp/rtmputils.h>
G_BEGIN_DECLS
@@ -35,13 +37,25 @@ typedef struct _GstRtmp2SrcClass GstRtmp2SrcClass;
struct _GstRtmp2Src
{
- GstBaseSrc base_rtmp2src;
+ GstPushSrc base_rtmp2src;
+ char *uri;
+
+ GMutex lock;
+ GCond cond;
+ GQueue *queue;
+ gboolean reset;
+ GstTask *task;
+ GRecMutex task_lock;
+
+ GstRtmpClient *client;
+ GstRtmpConnection *connection;
+ gboolean dump;
};
struct _GstRtmp2SrcClass
{
- GstBaseSrcClass base_rtmp2src_class;
+ GstPushSrcClass base_rtmp2src_class;
};
GType gst_rtmp2_src_get_type (void);
diff --git a/rtmp/amf.c b/rtmp/amf.c
index a510e67..c942fea 100644
--- a/rtmp/amf.c
+++ b/rtmp/amf.c
@@ -552,11 +552,13 @@ gst_amf_serialize_command (const char *command_name, int transaction_id,
serializer->size = 4096;
serializer->data = g_malloc (serializer->size);
+ _serialize_u8 (serializer, GST_AMF_TYPE_STRING);
_serialize_utf8_string (serializer, command_name);
+ _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER);
_serialize_number (serializer, transaction_id);
- _serialize_object (serializer, command_object);
+ _serialize_value (serializer, command_object);
if (optional_args)
- _serialize_object (serializer, optional_args);
+ _serialize_value (serializer, optional_args);
if (serializer->error) {
GST_ERROR ("failed to serialize");
@@ -565,3 +567,78 @@ gst_amf_serialize_command (const char *command_name, int transaction_id,
}
return g_bytes_new_take (serializer->data, serializer->offset);
}
+
+GBytes *
+gst_amf_serialize_command2 (const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args, GstAmfNode * n3,
+ GstAmfNode * n4)
+{
+ AmfSerializer _s = { 0 }, *serializer = &_s;
+
+ serializer->size = 4096;
+ serializer->data = g_malloc (serializer->size);
+
+ _serialize_u8 (serializer, GST_AMF_TYPE_STRING);
+ _serialize_utf8_string (serializer, command_name);
+ _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER);
+ _serialize_number (serializer, transaction_id);
+ _serialize_value (serializer, command_object);
+ if (optional_args)
+ _serialize_value (serializer, optional_args);
+ if (n3)
+ _serialize_value (serializer, n3);
+ if (n4)
+ _serialize_value (serializer, n4);
+
+ if (serializer->error) {
+ GST_ERROR ("failed to serialize");
+ g_free (serializer->data);
+ return NULL;
+ }
+ return g_bytes_new_take (serializer->data, serializer->offset);
+}
+
+gboolean
+gst_amf_node_get_boolean (const GstAmfNode * node)
+{
+ return node->int_val;
+}
+
+const char *
+gst_amf_node_get_string (const GstAmfNode * node)
+{
+ return node->string_val;
+}
+
+double
+gst_amf_node_get_number (const GstAmfNode * node)
+{
+ return node->double_val;
+}
+
+const GstAmfNode *
+gst_amf_node_get_object (const GstAmfNode * node, const char *field_name)
+{
+ int i;
+ for (i = 0; i < node->array_val->len; i++) {
+ AmfObjectField *field = g_ptr_array_index (node->array_val, i);
+ if (strcmp (field->name, field_name) == 0) {
+ return field->value;
+ }
+ }
+ return NULL;
+}
+
+int
+gst_amf_node_get_object_length (const GstAmfNode * node)
+{
+ return node->array_val->len;
+}
+
+const GstAmfNode *
+gst_amf_node_get_object_by_index (const GstAmfNode * node, int i)
+{
+ AmfObjectField *field;
+ field = g_ptr_array_index (node->array_val, i);
+ return field->value;
+}
diff --git a/rtmp/amf.h b/rtmp/amf.h
index 0ace7d3..4ab9676 100644
--- a/rtmp/amf.h
+++ b/rtmp/amf.h
@@ -70,6 +70,13 @@ void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size);
void gst_amf_object_append_take (GstAmfNode *node, const char *s,
GstAmfNode *child_node);
+gboolean gst_amf_node_get_boolean (const GstAmfNode *node);
+const char *gst_amf_node_get_string (const GstAmfNode *node);
+double gst_amf_node_get_number (const GstAmfNode *node);
+const GstAmfNode *gst_amf_node_get_object (const GstAmfNode *node, const char *field_name);
+int gst_amf_node_get_object_length (const GstAmfNode *node);
+const GstAmfNode *gst_amf_node_get_object_by_index (const GstAmfNode *node, int i);
+
void gst_amf_object_set_number (GstAmfNode *node, const char *field_name,
double val);
void gst_amf_object_set_string (GstAmfNode *node, const char *field_name,
@@ -77,6 +84,9 @@ void gst_amf_object_set_string (GstAmfNode *node, const char *field_name,
GBytes * gst_amf_serialize_command (const char *command_name,
int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args);
+GBytes * gst_amf_serialize_command2 (const char *command_name,
+ int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstAmfNode *n3, GstAmfNode *n4);
G_END_DECLS
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index b23fe45..6a22c9e 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -193,8 +193,8 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes,
header->message_type_id = data[offset];
offset += 1;
- header->info = (data[offset] << 24) | (data[offset + 1] << 16) |
- (data[offset + 2] << 8) | data[offset + 3];
+ header->info = (data[offset + 3] << 24) | (data[offset + 2] << 16) |
+ (data[offset + 1] << 8) | data[offset];
offset += 4;
} else {
header->timestamp = previous_header->timestamp;
@@ -262,10 +262,10 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk,
data[5] = (chunk->message_length >> 8) & 0xff;
data[6] = chunk->message_length & 0xff;
data[7] = chunk->message_type_id;
- data[8] = (chunk->info >> 24) & 0xff;
- data[9] = (chunk->info >> 16) & 0xff;
- data[10] = (chunk->info >> 8) & 0xff;
- data[11] = chunk->info & 0xff;
+ data[8] = chunk->info & 0xff;
+ data[9] = (chunk->info >> 8) & 0xff;
+ data[10] = (chunk->info >> 16) & 0xff;
+ data[11] = (chunk->info >> 24) & 0xff;
offset = 12;
} else {
data[1] = (timestamp >> 16) & 0xff;
@@ -377,3 +377,54 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry,
entry->previous_header.message_type_id = chunk->message_type_id;
entry->previous_header.info = chunk->info;
}
+
+gboolean
+gst_rtmp_chunk_parse_message (GstRtmpChunk * chunk, char **command_name,
+ double *transaction_id, GstAmfNode ** command_object,
+ GstAmfNode ** optional_args)
+{
+ gsize n_parsed;
+ const guint8 *data;
+ gsize size;
+ int offset;
+ GstAmfNode *n1, *n2, *n3, *n4;
+
+ offset = 0;
+ data = g_bytes_get_data (chunk->payload, &size);
+ n1 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ n2 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ n3 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ if (offset < size) {
+ n4 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ } else {
+ n4 = NULL;
+ }
+
+ if (command_name) {
+ *command_name = g_strdup (gst_amf_node_get_string (n1));
+ }
+ gst_amf_node_free (n1);
+
+ if (transaction_id) {
+ *transaction_id = gst_amf_node_get_number (n2);
+ }
+ gst_amf_node_free (n2);
+
+ if (command_object) {
+ *command_object = n3;
+ } else {
+ gst_amf_node_free (n3);
+ }
+
+ if (optional_args) {
+ *optional_args = n4;
+ } else {
+ if (n4)
+ gst_amf_node_free (n4);
+ }
+
+ return TRUE;
+}
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 2fa75ca..8b2b707 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -21,6 +21,7 @@
#define _GST_RTMP_CHUNK_H_
#include <glib.h>
+#include "rtmp/amf.h"
G_BEGIN_DECLS
@@ -99,6 +100,10 @@ GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk);
gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes);
gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes,
GstRtmpChunkHeader *previous_header);
+gboolean gst_rtmp_chunk_parse_message (GstRtmpChunk *chunk,
+ char **command_name, double *transaction_id,
+ GstAmfNode **command_object, GstAmfNode **optional_args);
+
/* chunk cache */
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 8b9eae8..241161e 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -206,6 +206,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
addr = g_network_address_new (client->server_address, client->server_port);
client->socket_client = g_socket_client_new ();
+ g_socket_client_set_timeout (client->socket_client, 5);
GST_DEBUG ("g_socket_client_connect_async");
g_socket_client_connect_async (client->socket_client, addr,
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 5ac713a..4952d29 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -40,6 +40,7 @@ static void gst_rtmp_connection_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
+static void gst_rtmp_connection_got_closed (GstRtmpConnection * connection);
static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data);
static gboolean gst_rtmp_connection_output_ready (GOutputStream * os,
@@ -70,10 +71,20 @@ static void gst_rtmp_connection_start_output (GstRtmpConnection * sc);
static void
gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
GstRtmpChunk * chunk);
-
+static void gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc,
+ GstRtmpChunk * chunk);
static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
+typedef struct _CommandCallback CommandCallback;
+struct _CommandCallback
+{
+ int stream_id;
+ int transaction_id;
+ GstRtmpCommandCallback func;
+ gpointer user_data;
+};
+
enum
{
PROP_0
@@ -100,6 +111,13 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass,
got_chunk), NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
+ g_signal_new ("got-control-chunk", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass,
+ got_control_chunk), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
+ g_signal_new ("closed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, closed),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
}
static void
@@ -228,8 +246,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
return G_SOURCE_REMOVE;
}
if (ret == 0) {
- /* FIXME probably closed */
- GST_ERROR ("closed?");
+ gst_rtmp_connection_got_closed (sc);
return G_SOURCE_REMOVE;
}
@@ -294,6 +311,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
}
static void
+gst_rtmp_connection_got_closed (GstRtmpConnection * connection)
+{
+ connection->closed = TRUE;
+ g_signal_emit_by_name (connection, "closed");
+}
+
+static void
gst_rtmp_connection_write_chunk_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
@@ -309,8 +333,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
ret = g_output_stream_write_bytes_finish (os, res, &error);
if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
- g_assert_not_reached ();
+ GST_DEBUG ("write error: %s", error->message);
+ gst_rtmp_connection_got_closed (connection);
g_error_free (error);
return;
}
@@ -526,19 +550,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
header.message_length);
entry->payload = NULL;
- if (entry->chunk->stream_id == 0x02) {
- GST_DEBUG ("got protocol control message, type: %d",
- entry->chunk->message_type_id);
- gst_rtmp_connection_handle_pcm (sc, entry->chunk);
- g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk);
- } else if (entry->chunk->message_type_id == 0x14) {
- /* FIXME parse command */
- g_signal_emit_by_name (sc, "got-command", entry->chunk);
- } else {
- GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes",
- entry->chunk->message_length);
- g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
- }
+ gst_rtmp_connection_handle_chunk (sc, entry->chunk);
g_object_unref (entry->chunk);
entry->chunk = NULL;
@@ -552,6 +564,48 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
}
static void
+gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, GstRtmpChunk * chunk)
+{
+ if (chunk->stream_id == 0x02) {
+ GST_DEBUG ("got protocol control message, type: %d",
+ chunk->message_type_id);
+ gst_rtmp_connection_handle_pcm (sc, chunk);
+ g_signal_emit_by_name (sc, "got-control-chunk", chunk);
+ } else {
+ if (chunk->message_type_id == 0x14) {
+ CommandCallback *cb = NULL;
+ GList *g;
+ char *command_name;
+ double transaction_id;
+ GstAmfNode *command_object;
+ GstAmfNode *optional_args;
+
+ gst_rtmp_chunk_parse_message (chunk, &command_name, &transaction_id,
+ &command_object, &optional_args);
+ for (g = sc->command_callbacks; g; g = g_list_next (g)) {
+ cb = g->data;
+ if (cb->stream_id == chunk->stream_id &&
+ cb->transaction_id == transaction_id) {
+ break;
+ }
+ }
+ if (cb) {
+ sc->command_callbacks = g_list_remove (sc->command_callbacks, cb);
+ cb->func (sc, chunk, command_name, transaction_id, command_object,
+ optional_args, cb->user_data);
+ g_free (command_name);
+ gst_amf_node_free (command_object);
+ if (optional_args)
+ gst_amf_node_free (optional_args);
+ g_free (cb);
+ }
+ }
+ GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length);
+ g_signal_emit_by_name (sc, "got-chunk", chunk);
+ }
+}
+
+static void
gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
GstRtmpChunk * chunk)
{
@@ -770,7 +824,8 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection)
int
gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id,
const char *command_name, int transaction_id, GstAmfNode * command_object,
- GstAmfNode * optional_args)
+ GstAmfNode * optional_args, GstRtmpCommandCallback response_command,
+ gpointer user_data)
{
GstRtmpChunk *chunk;
@@ -782,8 +837,59 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id,
chunk->payload = gst_amf_serialize_command (command_name, transaction_id,
command_object, optional_args);
+ chunk->message_length = g_bytes_get_size (chunk->payload);
gst_rtmp_connection_queue_chunk (connection, chunk);
+ if (response_command) {
+ CommandCallback *callback;
+
+ callback = g_malloc0 (sizeof (CommandCallback));
+ callback->stream_id = stream_id;
+ callback->transaction_id = transaction_id;
+ callback->func = response_command;
+ callback->user_data = user_data;
+
+ connection->command_callbacks =
+ g_list_append (connection->command_callbacks, callback);
+ }
+
+ return transaction_id;
+}
+
+int
+gst_rtmp_connection_send_command2 (GstRtmpConnection * connection,
+ int chunk_stream_id, int stream_id,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, GstAmfNode * n3, GstAmfNode * n4,
+ GstRtmpCommandCallback response_command, gpointer user_data)
+{
+ GstRtmpChunk *chunk;
+
+ chunk = gst_rtmp_chunk_new ();
+ chunk->stream_id = chunk_stream_id;
+ chunk->timestamp = 0; /* FIXME */
+ chunk->message_type_id = 0x14;
+ chunk->info = stream_id;
+
+ chunk->payload = gst_amf_serialize_command2 (command_name, transaction_id,
+ command_object, optional_args, n3, n4);
+ chunk->message_length = g_bytes_get_size (chunk->payload);
+
+ gst_rtmp_connection_queue_chunk (connection, chunk);
+
+ if (response_command) {
+ CommandCallback *callback;
+
+ callback = g_malloc0 (sizeof (CommandCallback));
+ callback->stream_id = stream_id;
+ callback->transaction_id = transaction_id;
+ callback->func = response_command;
+ callback->user_data = user_data;
+
+ connection->command_callbacks =
+ g_list_append (connection->command_callbacks, callback);
+ }
+
return transaction_id;
}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index aa2b718..3e9a2aa 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -35,6 +35,10 @@ G_BEGIN_DECLS
typedef struct _GstRtmpConnection GstRtmpConnection;
typedef struct _GstRtmpConnectionClass GstRtmpConnectionClass;
typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection *connection);
+typedef void (*GstRtmpCommandCallback) (GstRtmpConnection *connection,
+ GstRtmpChunk *chunk, const char *command_name, int transaction_id,
+ GstAmfNode *command_object, GstAmfNode *optional_args,
+ gpointer user_data);
struct _GstRtmpConnection
{
@@ -42,6 +46,7 @@ struct _GstRtmpConnection
/* should be properties */
gboolean input_paused;
+ gboolean closed;
/* private */
GSocketConnection *connection;
@@ -60,6 +65,7 @@ struct _GstRtmpConnection
gboolean handshake_complete;
GstRtmpChunkCache *input_chunk_cache;
GstRtmpChunkCache *output_chunk_cache;
+ GList *command_callbacks;
/* chunk currently being written */
GstRtmpChunk *output_chunk;
@@ -75,11 +81,10 @@ struct _GstRtmpConnectionClass
GObjectClass object_class;
/* signals */
- void (*got_chunk) (GstRtmpConnection *connection,
+ void (*got_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk);
+ void (*got_control_chunk) (GstRtmpConnection *connection,
GstRtmpChunk *chunk);
- void (*got_command) (GstRtmpConnection *connection,
- const char *command_name, int transaction_id,
- GstAmfNode *command_object, GstAmfNode *option_args);
+ void (*closed) (GstRtmpConnection *connection);
};
GType gst_rtmp_connection_get_type (void);
@@ -94,7 +99,13 @@ void gst_rtmp_connection_dump (GstRtmpConnection *connection);
int gst_rtmp_connection_send_command (GstRtmpConnection *connection,
int stream_id, const char *command_name, int transaction_id,
- GstAmfNode *command_object, GstAmfNode *optional_args);
+ GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstRtmpCommandCallback response_command, gpointer user_data);
+int gst_rtmp_connection_send_command2 (GstRtmpConnection *connection,
+ int chunk_stream_id, int stream_id, const char *command_name,
+ int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstAmfNode *n3, GstAmfNode *n4,
+ GstRtmpCommandCallback response_command, gpointer user_data);
G_END_DECLS
diff --git a/tools/client-test.c b/tools/client-test.c
index 7a9efbe..1250526 100644
--- a/tools/client-test.c
+++ b/tools/client-test.c
@@ -24,6 +24,7 @@
#include <gst/gst.h>
#include <gio/gio.h>
#include <stdlib.h>
+#include <string.h>
#include "rtmpclient.h"
#include "rtmputils.h"
@@ -44,9 +45,20 @@ static GOptionEntry entries[] = {
static void connect_done (GObject * source, GAsyncResult * result,
gpointer user_data);
+static void cmd_connect_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data);
static void send_connect (void);
static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
+static void send_create_stream (void);
+static void create_stream_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data);
+static void send_play (void);
+static void play_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data);
int
main (int argc, char *argv[])
@@ -68,7 +80,7 @@ main (int argc, char *argv[])
client = gst_rtmp_client_new ();
gst_rtmp_client_set_server_address (client,
- "ec2-54-189-67-158.us-west-2.compute.amazonaws.com");
+ "ec2-54-185-55-241.us-west-2.compute.amazonaws.com");
cancellable = g_cancellable_new ();
main_loop = g_main_loop_new (NULL, TRUE);
@@ -91,10 +103,9 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
if (!ret) {
GST_ERROR ("error: %s", error->message);
g_error_free (error);
+ return;
}
- GST_ERROR ("got here");
-
connection = gst_rtmp_client_get_connection (client);
g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
@@ -102,18 +113,6 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
}
static void
-send_connect (void)
-{
- GstAmfNode *node;
-
- node = gst_amf_node_new (GST_AMF_TYPE_OBJECT);
- gst_amf_object_set_string (node, "app", "live");
- gst_amf_object_set_string (node, "type", "nonprivate");
- gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live");
- gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL);
-}
-
-static void
dump_command (GstRtmpChunk * chunk)
{
GstAmfNode *amf;
@@ -143,7 +142,10 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir)
chunk->stream_id,
chunk->timestamp,
chunk->message_length, chunk->message_type_id, chunk->info);
- if (chunk->stream_id == 3 && chunk->message_type_id == 20) {
+ if (chunk->message_type_id == 20) {
+ dump_command (chunk);
+ }
+ if (chunk->message_type_id == 18) {
dump_command (chunk);
}
gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));
@@ -153,5 +155,119 @@ static void
got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data)
{
- dump_chunk (chunk, TRUE);
+ dump_chunk (chunk, FALSE);
+}
+
+static void
+send_connect (void)
+{
+ GstAmfNode *node;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_OBJECT);
+ gst_amf_object_set_string (node, "app", "live");
+ gst_amf_object_set_string (node, "type", "nonprivate");
+ gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live");
+ // "fpad": False,
+ // "capabilities": 15,
+ // "audioCodecs": 3191,
+ // "videoCodecs": 252,
+ // "videoFunction": 1,
+ gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL, cmd_connect_done, NULL);
+}
+
+static void
+cmd_connect_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data)
+{
+ gboolean ret;
+
+ ret = FALSE;
+ if (optional_args) {
+ const GstAmfNode *n;
+ n = gst_amf_node_get_object (optional_args, "code");
+ if (n) {
+ const char *s;
+ s = gst_amf_node_get_string (n);
+ if (strcmp (s, "NetConnection.Connect.Success") == 0) {
+ ret = TRUE;
+ }
+ }
+ }
+
+ if (ret) {
+ GST_ERROR("success");
+
+ send_create_stream ();
+ }
+}
+
+static void
+send_create_stream (void)
+{
+ GstAmfNode *node;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ gst_rtmp_connection_send_command (connection, 3, "createStream", 2, node, NULL, create_stream_done, NULL);
+
+}
+
+static void
+create_stream_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data)
+{
+ gboolean ret;
+ int stream_id;
+
+ ret = FALSE;
+ if (optional_args) {
+ stream_id = gst_amf_node_get_number (optional_args);
+ ret = TRUE;
+ }
+
+ if (ret) {
+ GST_ERROR("createStream success, stream_id=%d", stream_id);
+
+ send_play ();
+ }
+}
+
+static void
+send_play (void)
+{
+ GstAmfNode *n1;
+ GstAmfNode *n2;
+ GstAmfNode *n3;
+
+ n1 = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ n2 = gst_amf_node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_set_string (n2, "myStream");
+ n3 = gst_amf_node_new (GST_AMF_TYPE_NUMBER);
+ gst_amf_node_set_number (n3, 0);
+ gst_rtmp_connection_send_command2 (connection, 8, 1, "play", 3, n1, n2, n3, NULL, play_done, NULL);
+
+}
+
+static void
+play_done (GstRtmpConnection *connection, GstRtmpChunk *chunk,
+ const char *command_name, int transaction_id, GstAmfNode *command_object,
+ GstAmfNode *optional_args, gpointer user_data)
+{
+ gboolean ret;
+ int stream_id;
+
+ ret = FALSE;
+ if (optional_args) {
+ stream_id = gst_amf_node_get_number (optional_args);
+ ret = TRUE;
+ }
+
+ if (ret) {
+ GST_ERROR("play success, stream_id=%d", stream_id);
+
+ }
}
+
+
+
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 0ab122a..0d78fcb 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -85,7 +85,7 @@ main (int argc, char *argv[])
client = gst_rtmp_client_new ();
gst_rtmp_client_set_server_address (client,
- "ec2-54-189-67-158.us-west-2.compute.amazonaws.com");
+ "ec2-54-185-55-241.us-west-2.compute.amazonaws.com");
cancellable = g_cancellable_new ();
if (verbose)
@@ -219,7 +219,7 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir)
chunk->stream_id,
chunk->timestamp,
chunk->message_length, chunk->message_type_id, chunk->info);
- if (chunk->stream_id == 3 && chunk->message_type_id == 20) {
+ if (chunk->message_type_id == 0x14 || chunk->message_type_id == 0x18) {
dump_command (chunk);
}
gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));