diff options
author | David Schleef <ds@schleef.org> | 2014-08-30 13:17:13 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-30 13:17:13 -0700 |
commit | cff0d57190f3f0b9c9d0c30012b959f12055da08 (patch) | |
tree | 2eadf183af3c3d6b1062eebd3f06053be16e0378 | |
parent | bf029aebe192f30de5d79281d0b8c44f94cfd233 (diff) |
hacking
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | plugins/Makefile.am | 6 | ||||
-rw-r--r-- | plugins/gstrtmp2.c | 8 | ||||
-rw-r--r-- | plugins/gstrtmp2sink.c | 5 | ||||
-rw-r--r-- | plugins/gstrtmp2src.c | 372 | ||||
-rw-r--r-- | plugins/gstrtmp2src.h | 20 | ||||
-rw-r--r-- | rtmp/amf.c | 81 | ||||
-rw-r--r-- | rtmp/amf.h | 10 | ||||
-rw-r--r-- | rtmp/rtmpchunk.c | 63 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 5 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 1 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 144 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 21 | ||||
-rw-r--r-- | tools/client-test.c | 150 | ||||
-rw-r--r-- | tools/proxy-server.c | 4 |
15 files changed, 755 insertions, 137 deletions
diff --git a/configure.ac b/configure.ac index 357f186..3190a98 100644 --- a/configure.ac +++ b/configure.ac @@ -131,7 +131,7 @@ AM_CONDITIONAL(HAVE_FALSE, false) GST_RTMP_CFLAGS="$GST_RTMP_CFLAGS -I\$(top_srcdir)" AC_SUBST(GST_RTMP_CFLAGS) -GST_RTMP_LIBS="\$(top_builddir)/gst-rtmp/libgss-$GST_API_VERSION.la" +GST_RTMP_LIBS="\$(top_builddir)/rtmp/libgstrtmp-$GST_API_VERSION.la" AC_SUBST(GST_RTMP_LIBS) diff --git a/plugins/Makefile.am b/plugins/Makefile.am index 43990ca..885da90 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -1,13 +1,13 @@ -plugindir = ${prefix}/gstreamer-1.0 +plugindir = ${libdir}/gstreamer-1.0 plugin_LTLIBRARIES = libgstrtmp2.la libgstrtmp2_la_SOURCES = gstrtmp2src.c gstrtmp2sink.c gstrtmp2.c noinst_HEADERS = gstrtmp2src.h gstrtmp2sink.h -libgstrtmp2_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(RTMP_CFLAGS) -libgstrtmp2_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(RTMP_LIBS) $(WINSOCK2_LIBS) +libgstrtmp2_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GST_RTMP_CFLAGS) +libgstrtmp2_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(GST_RTMP_LIBS) $(WINSOCK2_LIBS) libgstrtmp2_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgstrtmp2_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) diff --git a/plugins/gstrtmp2.c b/plugins/gstrtmp2.c index dc82302..38da527 100644 --- a/plugins/gstrtmp2.c +++ b/plugins/gstrtmp2.c @@ -25,14 +25,12 @@ #include "gstrtmp2src.h" #include "gstrtmp2sink.h" -GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category); -#define GST_CAT_DEFAULT gst_rtmp2_src_debug_category - static gboolean plugin_init (GstPlugin * plugin) { - gst_element_register (plugin, "rtmp2src", GST_RANK_NONE, GST_TYPE_RTMP2_SRC); - gst_element_register (plugin, "rtmp2sink", GST_RANK_NONE, + gst_element_register (plugin, "rtmp2src", GST_RANK_PRIMARY + 1, + GST_TYPE_RTMP2_SRC); + gst_element_register (plugin, "rtmp2sink", GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SINK); return TRUE; diff --git a/plugins/gstrtmp2sink.c b/plugins/gstrtmp2sink.c index eacb268..b6d4713 100644 --- a/plugins/gstrtmp2sink.c +++ b/plugins/gstrtmp2sink.c @@ -19,7 +19,8 @@ /** * SECTION:element-gstrtmp2sink * - * The rtmp2sink element does FIXME stuff. + * The rtmp2sink element sends audio and video streams to an RTMP + * server. * * <refsect2> * <title>Example launch line</title> @@ -90,7 +91,7 @@ static GstStaticPadTemplate gst_rtmp2_sink_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/unknown") + GST_STATIC_CAPS ("video/x-flv") ); diff --git a/plugins/gstrtmp2src.c b/plugins/gstrtmp2src.c index 6b281c2..bca8ba4 100644 --- a/plugins/gstrtmp2src.c +++ b/plugins/gstrtmp2src.c @@ -19,12 +19,12 @@ /** * SECTION:element-gstrtmp2src * - * The rtmp2src element does FIXME stuff. + * The rtmp2src element receives input streams from an RTMP server. * * <refsect2> * <title>Example launch line</title> * |[ - * gst-launch -v fakesrc ! rtmp2src ! FIXME ! fakesink + * gst-launch -v rtmp2src ! decodebin ! fakesink * ]| * FIXME Describe what the pipeline does. * </refsect2> @@ -36,7 +36,10 @@ #include <gst/gst.h> #include <gst/base/gstbasesrc.h> +#include <gst/base/gstpushsrc.h> #include "gstrtmp2src.h" +#include <rtmp/rtmpchunk.h> +#include <string.h> GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category); #define GST_CAT_DEFAULT gst_rtmp2_src_debug_category @@ -50,13 +53,10 @@ static void gst_rtmp2_src_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp2_src_dispose (GObject * object); static void gst_rtmp2_src_finalize (GObject * object); +static void gst_rtmp2_src_uri_handler_init (gpointer g_iface, + gpointer iface_data); -static GstCaps *gst_rtmp2_src_get_caps (GstBaseSrc * src, GstCaps * filter); -static gboolean gst_rtmp2_src_negotiate (GstBaseSrc * src); -static GstCaps *gst_rtmp2_src_fixate (GstBaseSrc * src, GstCaps * caps); -static gboolean gst_rtmp2_src_set_caps (GstBaseSrc * src, GstCaps * caps); -static gboolean gst_rtmp2_src_decide_allocation (GstBaseSrc * src, - GstQuery * query); +static void gst_rtmp2_src_task (gpointer user_data); static gboolean gst_rtmp2_src_start (GstBaseSrc * src); static gboolean gst_rtmp2_src_stop (GstBaseSrc * src); static void gst_rtmp2_src_get_times (GstBaseSrc * src, GstBuffer * buffer, @@ -74,8 +74,25 @@ static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** buf); static GstFlowReturn gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** buf); -static GstFlowReturn gst_rtmp2_src_fill (GstBaseSrc * src, guint64 offset, - guint size, GstBuffer * buf); + +static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data); +static void connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void send_connect (GstRtmp2Src * src); +static void cmd_connect_done (GstRtmpConnection * connection, + GstRtmpChunk * chunk, const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, + gpointer user_data); +static void send_create_stream (GstRtmp2Src * src); +static void create_stream_done (GstRtmpConnection * connection, + GstRtmpChunk * chunk, const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, + gpointer user_data); +static void send_play (GstRtmp2Src * src); +static void play_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data); enum { @@ -88,15 +105,18 @@ static GstStaticPadTemplate gst_rtmp2_src_src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/unknown") + GST_STATIC_CAPS ("video/x-flv") ); /* class initialization */ -G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_BASE_SRC, - GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, - "debug category for rtmp2src element")); +G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC, + do { + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, + gst_rtmp2_src_uri_handler_init); + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, + "debug category for rtmp2src element");} while (0)); static void gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) @@ -110,19 +130,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) gst_static_pad_template_get (&gst_rtmp2_src_src_template)); gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass), - "FIXME Long name", "Generic", "FIXME Description", - "FIXME <fixme@example.com>"); + "RTMP source element", "Source", "Source element for RTMP streams", + "David Schleef <ds@schleef.org>"); gobject_class->set_property = gst_rtmp2_src_set_property; gobject_class->get_property = gst_rtmp2_src_get_property; gobject_class->dispose = gst_rtmp2_src_dispose; gobject_class->finalize = gst_rtmp2_src_finalize; - base_src_class->get_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_src_get_caps); - base_src_class->negotiate = GST_DEBUG_FUNCPTR (gst_rtmp2_src_negotiate); - base_src_class->fixate = GST_DEBUG_FUNCPTR (gst_rtmp2_src_fixate); - base_src_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_src_set_caps); - base_src_class->decide_allocation = - GST_DEBUG_FUNCPTR (gst_rtmp2_src_decide_allocation); base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start); base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop); base_src_class->get_times = GST_DEBUG_FUNCPTR (gst_rtmp2_src_get_times); @@ -133,17 +147,70 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) base_src_class->do_seek = GST_DEBUG_FUNCPTR (gst_rtmp2_src_do_seek); base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock); base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop); - base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query); + if (0) + base_src_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_src_query); base_src_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_src_event); base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create); base_src_class->alloc = GST_DEBUG_FUNCPTR (gst_rtmp2_src_alloc); - base_src_class->fill = GST_DEBUG_FUNCPTR (gst_rtmp2_src_fill); } static void gst_rtmp2_src_init (GstRtmp2Src * rtmp2src) { + g_mutex_init (&rtmp2src->lock); + rtmp2src->queue = g_queue_new (); + + //gst_base_src_set_live (GST_BASE_SRC(rtmp2src), TRUE); + + rtmp2src->task = gst_task_new (gst_rtmp2_src_task, rtmp2src, NULL); + g_rec_mutex_init (&rtmp2src->task_lock); + gst_task_set_lock (rtmp2src->task, &rtmp2src->task_lock); + rtmp2src->client = gst_rtmp_client_new (); + gst_rtmp_client_set_server_address (rtmp2src->client, + "ec2-54-185-55-241.us-west-2.compute.amazonaws.com"); + +} + +static GstURIType +gst_rtmp2_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +gst_rtmp2_src_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { "rtmp", NULL }; + + return protocols; +} + +static gchar * +gst_rtmp2_src_uri_get_uri (GstURIHandler * handler) +{ + GstRtmp2Src *src = GST_RTMP2_SRC (handler); + + /* FIXME: make thread-safe */ + return g_strdup (src->uri); +} + +static gboolean +gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** error) +{ + return TRUE; +} + +static void +gst_rtmp2_src_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_rtmp2_src_uri_get_type; + iface->get_protocols = gst_rtmp2_src_uri_get_protocols; + iface->get_uri = gst_rtmp2_src_uri_get_uri; + iface->set_uri = gst_rtmp2_src_uri_set_uri; } void @@ -196,74 +263,227 @@ gst_rtmp2_src_finalize (GObject * object) GST_DEBUG_OBJECT (rtmp2src, "finalize"); /* clean up object here */ + g_object_unref (rtmp2src->task); + g_rec_mutex_clear (&rtmp2src->task_lock); + g_object_unref (rtmp2src->client); + g_mutex_clear (&rtmp2src->lock); + g_queue_free_full (rtmp2src->queue, g_object_unref); G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object); } -/* get caps from subclass */ -static GstCaps * -gst_rtmp2_src_get_caps (GstBaseSrc * src, GstCaps * filter) +/* start and stop processing, ideal for opening/closing the resource */ +static gboolean +gst_rtmp2_src_start (GstBaseSrc * src) { GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); - GST_DEBUG_OBJECT (rtmp2src, "get_caps"); + GST_DEBUG_OBJECT (rtmp2src, "start"); - return NULL; + gst_task_start (rtmp2src->task); + + return TRUE; } -/* decide on caps */ -static gboolean -gst_rtmp2_src_negotiate (GstBaseSrc * src) +static void +gst_rtmp2_src_task (gpointer user_data) { - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + GMainLoop *main_loop; + GMainContext *main_context; + + gst_rtmp_client_connect_async (rtmp2src->client, NULL, connect_done, + rtmp2src); + + main_context = g_main_context_new (); + main_loop = g_main_loop_new (main_context, TRUE); + g_main_loop_run (main_loop); + g_main_loop_unref (main_loop); + g_main_context_unref (main_context); +} - GST_DEBUG_OBJECT (rtmp2src, "negotiate"); +static void +connect_done (GObject * source, GAsyncResult * result, gpointer user_data) +{ + GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + GError *error = NULL; + gboolean ret; + + ret = gst_rtmp_client_connect_finish (rtmp2src->client, result, &error); + if (!ret) { + GST_ERROR ("error: %s", error->message); + g_error_free (error); + return; + } - return TRUE; + rtmp2src->connection = gst_rtmp_client_get_connection (rtmp2src->client); + g_signal_connect (rtmp2src->connection, "got-chunk", G_CALLBACK (got_chunk), + rtmp2src); + + send_connect (rtmp2src); } -/* called if, in negotiation, caps need fixating */ -static GstCaps * -gst_rtmp2_src_fixate (GstBaseSrc * src, GstCaps * caps) +static void +dump_command (GstRtmpChunk * chunk) { - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstAmfNode *amf; + gsize size; + const guint8 *data; + gsize n_parsed; + int offset; + + offset = 0; + data = g_bytes_get_data (chunk->payload, &size); + while (offset < size) { + amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + gst_amf_node_dump (amf); + gst_amf_node_free (amf); + offset += n_parsed; + } +} + +static void +dump_chunk (GstRtmpChunk * chunk, gboolean dir) +{ + g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT + " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<", + chunk->stream_id, + chunk->timestamp, + chunk->message_length, chunk->message_type_id, chunk->info); + if (chunk->message_type_id == 20) { + dump_command (chunk); + } + if (chunk->message_type_id == 18) { + dump_command (chunk); + } + gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); +} - GST_DEBUG_OBJECT (rtmp2src, "fixate"); +static void +got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data) +{ + GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + if (rtmp2src->dump) { + dump_chunk (chunk, FALSE); + } +} - return NULL; +static void +send_connect (GstRtmp2Src * rtmp2src) +{ + GstAmfNode *node; + + node = gst_amf_node_new (GST_AMF_TYPE_OBJECT); + gst_amf_object_set_string (node, "app", "live"); + gst_amf_object_set_string (node, "type", "nonprivate"); + gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live"); + // "fpad": False, + // "capabilities": 15, + // "audioCodecs": 3191, + // "videoCodecs": 252, + // "videoFunction": 1, + gst_rtmp_connection_send_command (rtmp2src->connection, 3, "connect", 1, node, + NULL, cmd_connect_done, rtmp2src); } -/* notify the subclass of new caps */ -static gboolean -gst_rtmp2_src_set_caps (GstBaseSrc * src, GstCaps * caps) +static void +cmd_connect_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data) { - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + gboolean ret; + + ret = FALSE; + if (optional_args) { + const GstAmfNode *n; + n = gst_amf_node_get_object (optional_args, "code"); + if (n) { + const char *s; + s = gst_amf_node_get_string (n); + if (strcmp (s, "NetConnection.Connect.Success") == 0) { + ret = TRUE; + } + } + } - GST_DEBUG_OBJECT (rtmp2src, "set_caps"); + if (ret) { + GST_ERROR ("success"); - return TRUE; + send_create_stream (rtmp2src); + } } -/* setup allocation query */ -static gboolean -gst_rtmp2_src_decide_allocation (GstBaseSrc * src, GstQuery * query) +static void +send_create_stream (GstRtmp2Src * rtmp2src) { - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstAmfNode *node; - GST_DEBUG_OBJECT (rtmp2src, "decide_allocation"); + node = gst_amf_node_new (GST_AMF_TYPE_NULL); + gst_rtmp_connection_send_command (rtmp2src->connection, 3, "createStream", 2, + node, NULL, create_stream_done, rtmp2src); - return TRUE; } -/* start and stop processing, ideal for opening/closing the resource */ -static gboolean -gst_rtmp2_src_start (GstBaseSrc * src) +static void +create_stream_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data) { - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + gboolean ret; + int stream_id; + + ret = FALSE; + if (optional_args) { + stream_id = gst_amf_node_get_number (optional_args); + ret = TRUE; + } - GST_DEBUG_OBJECT (rtmp2src, "start"); + if (ret) { + GST_ERROR ("createStream success, stream_id=%d", stream_id); - return TRUE; + send_play (rtmp2src); + } +} + +static void +send_play (GstRtmp2Src * rtmp2src) +{ + GstAmfNode *n1; + GstAmfNode *n2; + GstAmfNode *n3; + + n1 = gst_amf_node_new (GST_AMF_TYPE_NULL); + n2 = gst_amf_node_new (GST_AMF_TYPE_STRING); + gst_amf_node_set_string (n2, "myStream"); + n3 = gst_amf_node_new (GST_AMF_TYPE_NUMBER); + gst_amf_node_set_number (n3, 0); + gst_rtmp_connection_send_command2 (rtmp2src->connection, 8, 1, "play", 3, n1, + n2, n3, NULL, play_done, rtmp2src); + +} + +static void +play_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data) +{ + //GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (user_data); + gboolean ret; + int stream_id; + + ret = FALSE; + if (optional_args) { + stream_id = gst_amf_node_get_number (optional_args); + ret = TRUE; + } + + if (ret) { + GST_ERROR ("play success, stream_id=%d", stream_id); + + } } static gboolean @@ -307,7 +527,7 @@ gst_rtmp2_src_is_seekable (GstBaseSrc * src) GST_DEBUG_OBJECT (rtmp2src, "is_seekable"); - return TRUE; + return FALSE; } /* Prepare the segment on which to perform do_seek(), converting to the @@ -386,9 +606,29 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** buf) { GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); + GstRtmpChunk *chunk; + const char *data; + gsize payload_size; GST_DEBUG_OBJECT (rtmp2src, "create"); + g_mutex_lock (&rtmp2src->lock); + chunk = g_queue_pop_head (rtmp2src->queue); + while (!chunk) { + if (rtmp2src->reset) { + g_mutex_unlock (&rtmp2src->lock); + return GST_FLOW_ERROR; + } + g_cond_wait (&rtmp2src->cond, &rtmp2src->lock); + chunk = g_queue_pop_head (rtmp2src->queue); + } + g_mutex_unlock (&rtmp2src->lock); + + data = g_bytes_get_data (chunk->payload, &payload_size); + *buf = + gst_buffer_new_wrapped_full (0, (gpointer) data, payload_size, 0, + payload_size, chunk, g_object_unref); + return GST_FLOW_OK; } @@ -404,15 +644,3 @@ gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size, return GST_FLOW_OK; } - -/* ask the subclass to fill the buffer with data from offset and size */ -static GstFlowReturn -gst_rtmp2_src_fill (GstBaseSrc * src, guint64 offset, guint size, - GstBuffer * buf) -{ - GstRtmp2Src *rtmp2src = GST_RTMP2_SRC (src); - - GST_DEBUG_OBJECT (rtmp2src, "fill"); - - return GST_FLOW_OK; -} diff --git a/plugins/gstrtmp2src.h b/plugins/gstrtmp2src.h index 59d7511..6529a6e 100644 --- a/plugins/gstrtmp2src.h +++ b/plugins/gstrtmp2src.h @@ -20,7 +20,9 @@ #ifndef _GST_RTMP2_SRC_H_ #define _GST_RTMP2_SRC_H_ -#include <gst/base/gstbasesrc.h> +#include <gst/base/gstpushsrc.h> +#include <rtmp/rtmpclient.h> +#include <rtmp/rtmputils.h> G_BEGIN_DECLS @@ -35,13 +37,25 @@ typedef struct _GstRtmp2SrcClass GstRtmp2SrcClass; struct _GstRtmp2Src { - GstBaseSrc base_rtmp2src; + GstPushSrc base_rtmp2src; + char *uri; + + GMutex lock; + GCond cond; + GQueue *queue; + gboolean reset; + GstTask *task; + GRecMutex task_lock; + + GstRtmpClient *client; + GstRtmpConnection *connection; + gboolean dump; }; struct _GstRtmp2SrcClass { - GstBaseSrcClass base_rtmp2src_class; + GstPushSrcClass base_rtmp2src_class; }; GType gst_rtmp2_src_get_type (void); @@ -552,11 +552,13 @@ gst_amf_serialize_command (const char *command_name, int transaction_id, serializer->size = 4096; serializer->data = g_malloc (serializer->size); + _serialize_u8 (serializer, GST_AMF_TYPE_STRING); _serialize_utf8_string (serializer, command_name); + _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER); _serialize_number (serializer, transaction_id); - _serialize_object (serializer, command_object); + _serialize_value (serializer, command_object); if (optional_args) - _serialize_object (serializer, optional_args); + _serialize_value (serializer, optional_args); if (serializer->error) { GST_ERROR ("failed to serialize"); @@ -565,3 +567,78 @@ gst_amf_serialize_command (const char *command_name, int transaction_id, } return g_bytes_new_take (serializer->data, serializer->offset); } + +GBytes * +gst_amf_serialize_command2 (const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, GstAmfNode * n3, + GstAmfNode * n4) +{ + AmfSerializer _s = { 0 }, *serializer = &_s; + + serializer->size = 4096; + serializer->data = g_malloc (serializer->size); + + _serialize_u8 (serializer, GST_AMF_TYPE_STRING); + _serialize_utf8_string (serializer, command_name); + _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER); + _serialize_number (serializer, transaction_id); + _serialize_value (serializer, command_object); + if (optional_args) + _serialize_value (serializer, optional_args); + if (n3) + _serialize_value (serializer, n3); + if (n4) + _serialize_value (serializer, n4); + + if (serializer->error) { + GST_ERROR ("failed to serialize"); + g_free (serializer->data); + return NULL; + } + return g_bytes_new_take (serializer->data, serializer->offset); +} + +gboolean +gst_amf_node_get_boolean (const GstAmfNode * node) +{ + return node->int_val; +} + +const char * +gst_amf_node_get_string (const GstAmfNode * node) +{ + return node->string_val; +} + +double +gst_amf_node_get_number (const GstAmfNode * node) +{ + return node->double_val; +} + +const GstAmfNode * +gst_amf_node_get_object (const GstAmfNode * node, const char *field_name) +{ + int i; + for (i = 0; i < node->array_val->len; i++) { + AmfObjectField *field = g_ptr_array_index (node->array_val, i); + if (strcmp (field->name, field_name) == 0) { + return field->value; + } + } + return NULL; +} + +int +gst_amf_node_get_object_length (const GstAmfNode * node) +{ + return node->array_val->len; +} + +const GstAmfNode * +gst_amf_node_get_object_by_index (const GstAmfNode * node, int i) +{ + AmfObjectField *field; + field = g_ptr_array_index (node->array_val, i); + return field->value; +} @@ -70,6 +70,13 @@ void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size); void gst_amf_object_append_take (GstAmfNode *node, const char *s, GstAmfNode *child_node); +gboolean gst_amf_node_get_boolean (const GstAmfNode *node); +const char *gst_amf_node_get_string (const GstAmfNode *node); +double gst_amf_node_get_number (const GstAmfNode *node); +const GstAmfNode *gst_amf_node_get_object (const GstAmfNode *node, const char *field_name); +int gst_amf_node_get_object_length (const GstAmfNode *node); +const GstAmfNode *gst_amf_node_get_object_by_index (const GstAmfNode *node, int i); + void gst_amf_object_set_number (GstAmfNode *node, const char *field_name, double val); void gst_amf_object_set_string (GstAmfNode *node, const char *field_name, @@ -77,6 +84,9 @@ void gst_amf_object_set_string (GstAmfNode *node, const char *field_name, GBytes * gst_amf_serialize_command (const char *command_name, int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args); +GBytes * gst_amf_serialize_command2 (const char *command_name, + int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args, + GstAmfNode *n3, GstAmfNode *n4); G_END_DECLS diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index b23fe45..6a22c9e 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -193,8 +193,8 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes, header->message_type_id = data[offset]; offset += 1; - header->info = (data[offset] << 24) | (data[offset + 1] << 16) | - (data[offset + 2] << 8) | data[offset + 3]; + header->info = (data[offset + 3] << 24) | (data[offset + 2] << 16) | + (data[offset + 1] << 8) | data[offset]; offset += 4; } else { header->timestamp = previous_header->timestamp; @@ -262,10 +262,10 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, data[5] = (chunk->message_length >> 8) & 0xff; data[6] = chunk->message_length & 0xff; data[7] = chunk->message_type_id; - data[8] = (chunk->info >> 24) & 0xff; - data[9] = (chunk->info >> 16) & 0xff; - data[10] = (chunk->info >> 8) & 0xff; - data[11] = chunk->info & 0xff; + data[8] = chunk->info & 0xff; + data[9] = (chunk->info >> 8) & 0xff; + data[10] = (chunk->info >> 16) & 0xff; + data[11] = (chunk->info >> 24) & 0xff; offset = 12; } else { data[1] = (timestamp >> 16) & 0xff; @@ -377,3 +377,54 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry, entry->previous_header.message_type_id = chunk->message_type_id; entry->previous_header.info = chunk->info; } + +gboolean +gst_rtmp_chunk_parse_message (GstRtmpChunk * chunk, char **command_name, + double *transaction_id, GstAmfNode ** command_object, + GstAmfNode ** optional_args) +{ + gsize n_parsed; + const guint8 *data; + gsize size; + int offset; + GstAmfNode *n1, *n2, *n3, *n4; + + offset = 0; + data = g_bytes_get_data (chunk->payload, &size); + n1 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + n2 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + n3 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + if (offset < size) { + n4 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + } else { + n4 = NULL; + } + + if (command_name) { + *command_name = g_strdup (gst_amf_node_get_string (n1)); + } + gst_amf_node_free (n1); + + if (transaction_id) { + *transaction_id = gst_amf_node_get_number (n2); + } + gst_amf_node_free (n2); + + if (command_object) { + *command_object = n3; + } else { + gst_amf_node_free (n3); + } + + if (optional_args) { + *optional_args = n4; + } else { + if (n4) + gst_amf_node_free (n4); + } + + return TRUE; +} diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 2fa75ca..8b2b707 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -21,6 +21,7 @@ #define _GST_RTMP_CHUNK_H_ #include <glib.h> +#include "rtmp/amf.h" G_BEGIN_DECLS @@ -99,6 +100,10 @@ GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk); gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes); gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes, GstRtmpChunkHeader *previous_header); +gboolean gst_rtmp_chunk_parse_message (GstRtmpChunk *chunk, + char **command_name, double *transaction_id, + GstAmfNode **command_object, GstAmfNode **optional_args); + /* chunk cache */ diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 8b9eae8..241161e 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -206,6 +206,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, addr = g_network_address_new (client->server_address, client->server_port); client->socket_client = g_socket_client_new (); + g_socket_client_set_timeout (client->socket_client, 5); GST_DEBUG ("g_socket_client_connect_async"); g_socket_client_connect_async (client->socket_client, addr, diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 5ac713a..4952d29 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -40,6 +40,7 @@ static void gst_rtmp_connection_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_finalize (GObject * object); +static void gst_rtmp_connection_got_closed (GstRtmpConnection * connection); static gboolean gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data); static gboolean gst_rtmp_connection_output_ready (GOutputStream * os, @@ -70,10 +71,20 @@ static void gst_rtmp_connection_start_output (GstRtmpConnection * sc); static void gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, GstRtmpChunk * chunk); - +static void gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, + GstRtmpChunk * chunk); static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); +typedef struct _CommandCallback CommandCallback; +struct _CommandCallback +{ + int stream_id; + int transaction_id; + GstRtmpCommandCallback func; + gpointer user_data; +}; + enum { PROP_0 @@ -100,6 +111,13 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, got_chunk), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); + g_signal_new ("got-control-chunk", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, + got_control_chunk), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); + g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, closed), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0); } static void @@ -228,8 +246,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) return G_SOURCE_REMOVE; } if (ret == 0) { - /* FIXME probably closed */ - GST_ERROR ("closed?"); + gst_rtmp_connection_got_closed (sc); return G_SOURCE_REMOVE; } @@ -294,6 +311,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) } static void +gst_rtmp_connection_got_closed (GstRtmpConnection * connection) +{ + connection->closed = TRUE; + g_signal_emit_by_name (connection, "closed"); +} + +static void gst_rtmp_connection_write_chunk_done (GObject * obj, GAsyncResult * res, gpointer user_data) { @@ -309,8 +333,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, ret = g_output_stream_write_bytes_finish (os, res, &error); if (ret < 0) { - GST_ERROR ("write error: %s", error->message); - g_assert_not_reached (); + GST_DEBUG ("write error: %s", error->message); + gst_rtmp_connection_got_closed (connection); g_error_free (error); return; } @@ -526,19 +550,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) header.message_length); entry->payload = NULL; - if (entry->chunk->stream_id == 0x02) { - GST_DEBUG ("got protocol control message, type: %d", - entry->chunk->message_type_id); - gst_rtmp_connection_handle_pcm (sc, entry->chunk); - g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk); - } else if (entry->chunk->message_type_id == 0x14) { - /* FIXME parse command */ - g_signal_emit_by_name (sc, "got-command", entry->chunk); - } else { - GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", - entry->chunk->message_length); - g_signal_emit_by_name (sc, "got-chunk", entry->chunk); - } + gst_rtmp_connection_handle_chunk (sc, entry->chunk); g_object_unref (entry->chunk); entry->chunk = NULL; @@ -552,6 +564,48 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) } static void +gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, GstRtmpChunk * chunk) +{ + if (chunk->stream_id == 0x02) { + GST_DEBUG ("got protocol control message, type: %d", + chunk->message_type_id); + gst_rtmp_connection_handle_pcm (sc, chunk); + g_signal_emit_by_name (sc, "got-control-chunk", chunk); + } else { + if (chunk->message_type_id == 0x14) { + CommandCallback *cb = NULL; + GList *g; + char *command_name; + double transaction_id; + GstAmfNode *command_object; + GstAmfNode *optional_args; + + gst_rtmp_chunk_parse_message (chunk, &command_name, &transaction_id, + &command_object, &optional_args); + for (g = sc->command_callbacks; g; g = g_list_next (g)) { + cb = g->data; + if (cb->stream_id == chunk->stream_id && + cb->transaction_id == transaction_id) { + break; + } + } + if (cb) { + sc->command_callbacks = g_list_remove (sc->command_callbacks, cb); + cb->func (sc, chunk, command_name, transaction_id, command_object, + optional_args, cb->user_data); + g_free (command_name); + gst_amf_node_free (command_object); + if (optional_args) + gst_amf_node_free (optional_args); + g_free (cb); + } + } + GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length); + g_signal_emit_by_name (sc, "got-chunk", chunk); + } +} + +static void gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, GstRtmpChunk * chunk) { @@ -770,7 +824,8 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection) int gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id, const char *command_name, int transaction_id, GstAmfNode * command_object, - GstAmfNode * optional_args) + GstAmfNode * optional_args, GstRtmpCommandCallback response_command, + gpointer user_data) { GstRtmpChunk *chunk; @@ -782,8 +837,59 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id, chunk->payload = gst_amf_serialize_command (command_name, transaction_id, command_object, optional_args); + chunk->message_length = g_bytes_get_size (chunk->payload); gst_rtmp_connection_queue_chunk (connection, chunk); + if (response_command) { + CommandCallback *callback; + + callback = g_malloc0 (sizeof (CommandCallback)); + callback->stream_id = stream_id; + callback->transaction_id = transaction_id; + callback->func = response_command; + callback->user_data = user_data; + + connection->command_callbacks = + g_list_append (connection->command_callbacks, callback); + } + + return transaction_id; +} + +int +gst_rtmp_connection_send_command2 (GstRtmpConnection * connection, + int chunk_stream_id, int stream_id, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, GstAmfNode * n3, GstAmfNode * n4, + GstRtmpCommandCallback response_command, gpointer user_data) +{ + GstRtmpChunk *chunk; + + chunk = gst_rtmp_chunk_new (); + chunk->stream_id = chunk_stream_id; + chunk->timestamp = 0; /* FIXME */ + chunk->message_type_id = 0x14; + chunk->info = stream_id; + + chunk->payload = gst_amf_serialize_command2 (command_name, transaction_id, + command_object, optional_args, n3, n4); + chunk->message_length = g_bytes_get_size (chunk->payload); + + gst_rtmp_connection_queue_chunk (connection, chunk); + + if (response_command) { + CommandCallback *callback; + + callback = g_malloc0 (sizeof (CommandCallback)); + callback->stream_id = stream_id; + callback->transaction_id = transaction_id; + callback->func = response_command; + callback->user_data = user_data; + + connection->command_callbacks = + g_list_append (connection->command_callbacks, callback); + } + return transaction_id; } diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index aa2b718..3e9a2aa 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -35,6 +35,10 @@ G_BEGIN_DECLS typedef struct _GstRtmpConnection GstRtmpConnection; typedef struct _GstRtmpConnectionClass GstRtmpConnectionClass; typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection *connection); +typedef void (*GstRtmpCommandCallback) (GstRtmpConnection *connection, + GstRtmpChunk *chunk, const char *command_name, int transaction_id, + GstAmfNode *command_object, GstAmfNode *optional_args, + gpointer user_data); struct _GstRtmpConnection { @@ -42,6 +46,7 @@ struct _GstRtmpConnection /* should be properties */ gboolean input_paused; + gboolean closed; /* private */ GSocketConnection *connection; @@ -60,6 +65,7 @@ struct _GstRtmpConnection gboolean handshake_complete; GstRtmpChunkCache *input_chunk_cache; GstRtmpChunkCache *output_chunk_cache; + GList *command_callbacks; /* chunk currently being written */ GstRtmpChunk *output_chunk; @@ -75,11 +81,10 @@ struct _GstRtmpConnectionClass GObjectClass object_class; /* signals */ - void (*got_chunk) (GstRtmpConnection *connection, + void (*got_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk); + void (*got_control_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk); - void (*got_command) (GstRtmpConnection *connection, - const char *command_name, int transaction_id, - GstAmfNode *command_object, GstAmfNode *option_args); + void (*closed) (GstRtmpConnection *connection); }; GType gst_rtmp_connection_get_type (void); @@ -94,7 +99,13 @@ void gst_rtmp_connection_dump (GstRtmpConnection *connection); int gst_rtmp_connection_send_command (GstRtmpConnection *connection, int stream_id, const char *command_name, int transaction_id, - GstAmfNode *command_object, GstAmfNode *optional_args); + GstAmfNode *command_object, GstAmfNode *optional_args, + GstRtmpCommandCallback response_command, gpointer user_data); +int gst_rtmp_connection_send_command2 (GstRtmpConnection *connection, + int chunk_stream_id, int stream_id, const char *command_name, + int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args, + GstAmfNode *n3, GstAmfNode *n4, + GstRtmpCommandCallback response_command, gpointer user_data); G_END_DECLS diff --git a/tools/client-test.c b/tools/client-test.c index 7a9efbe..1250526 100644 --- a/tools/client-test.c +++ b/tools/client-test.c @@ -24,6 +24,7 @@ #include <gst/gst.h> #include <gio/gio.h> #include <stdlib.h> +#include <string.h> #include "rtmpclient.h" #include "rtmputils.h" @@ -44,9 +45,20 @@ static GOptionEntry entries[] = { static void connect_done (GObject * source, GAsyncResult * result, gpointer user_data); +static void cmd_connect_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data); static void send_connect (void); static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); +static void send_create_stream (void); +static void create_stream_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data); +static void send_play (void); +static void play_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data); int main (int argc, char *argv[]) @@ -68,7 +80,7 @@ main (int argc, char *argv[]) client = gst_rtmp_client_new (); gst_rtmp_client_set_server_address (client, - "ec2-54-189-67-158.us-west-2.compute.amazonaws.com"); + "ec2-54-185-55-241.us-west-2.compute.amazonaws.com"); cancellable = g_cancellable_new (); main_loop = g_main_loop_new (NULL, TRUE); @@ -91,10 +103,9 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data) if (!ret) { GST_ERROR ("error: %s", error->message); g_error_free (error); + return; } - GST_ERROR ("got here"); - connection = gst_rtmp_client_get_connection (client); g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); @@ -102,18 +113,6 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data) } static void -send_connect (void) -{ - GstAmfNode *node; - - node = gst_amf_node_new (GST_AMF_TYPE_OBJECT); - gst_amf_object_set_string (node, "app", "live"); - gst_amf_object_set_string (node, "type", "nonprivate"); - gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live"); - gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL); -} - -static void dump_command (GstRtmpChunk * chunk) { GstAmfNode *amf; @@ -143,7 +142,10 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir) chunk->stream_id, chunk->timestamp, chunk->message_length, chunk->message_type_id, chunk->info); - if (chunk->stream_id == 3 && chunk->message_type_id == 20) { + if (chunk->message_type_id == 20) { + dump_command (chunk); + } + if (chunk->message_type_id == 18) { dump_command (chunk); } gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); @@ -153,5 +155,119 @@ static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data) { - dump_chunk (chunk, TRUE); + dump_chunk (chunk, FALSE); +} + +static void +send_connect (void) +{ + GstAmfNode *node; + + node = gst_amf_node_new (GST_AMF_TYPE_OBJECT); + gst_amf_object_set_string (node, "app", "live"); + gst_amf_object_set_string (node, "type", "nonprivate"); + gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live"); + // "fpad": False, + // "capabilities": 15, + // "audioCodecs": 3191, + // "videoCodecs": 252, + // "videoFunction": 1, + gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL, cmd_connect_done, NULL); +} + +static void +cmd_connect_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data) +{ + gboolean ret; + + ret = FALSE; + if (optional_args) { + const GstAmfNode *n; + n = gst_amf_node_get_object (optional_args, "code"); + if (n) { + const char *s; + s = gst_amf_node_get_string (n); + if (strcmp (s, "NetConnection.Connect.Success") == 0) { + ret = TRUE; + } + } + } + + if (ret) { + GST_ERROR("success"); + + send_create_stream (); + } +} + +static void +send_create_stream (void) +{ + GstAmfNode *node; + + node = gst_amf_node_new (GST_AMF_TYPE_NULL); + gst_rtmp_connection_send_command (connection, 3, "createStream", 2, node, NULL, create_stream_done, NULL); + +} + +static void +create_stream_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data) +{ + gboolean ret; + int stream_id; + + ret = FALSE; + if (optional_args) { + stream_id = gst_amf_node_get_number (optional_args); + ret = TRUE; + } + + if (ret) { + GST_ERROR("createStream success, stream_id=%d", stream_id); + + send_play (); + } +} + +static void +send_play (void) +{ + GstAmfNode *n1; + GstAmfNode *n2; + GstAmfNode *n3; + + n1 = gst_amf_node_new (GST_AMF_TYPE_NULL); + n2 = gst_amf_node_new (GST_AMF_TYPE_STRING); + gst_amf_node_set_string (n2, "myStream"); + n3 = gst_amf_node_new (GST_AMF_TYPE_NUMBER); + gst_amf_node_set_number (n3, 0); + gst_rtmp_connection_send_command2 (connection, 8, 1, "play", 3, n1, n2, n3, NULL, play_done, NULL); + +} + +static void +play_done (GstRtmpConnection *connection, GstRtmpChunk *chunk, + const char *command_name, int transaction_id, GstAmfNode *command_object, + GstAmfNode *optional_args, gpointer user_data) +{ + gboolean ret; + int stream_id; + + ret = FALSE; + if (optional_args) { + stream_id = gst_amf_node_get_number (optional_args); + ret = TRUE; + } + + if (ret) { + GST_ERROR("play success, stream_id=%d", stream_id); + + } } + + + diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 0ab122a..0d78fcb 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -85,7 +85,7 @@ main (int argc, char *argv[]) client = gst_rtmp_client_new (); gst_rtmp_client_set_server_address (client, - "ec2-54-189-67-158.us-west-2.compute.amazonaws.com"); + "ec2-54-185-55-241.us-west-2.compute.amazonaws.com"); cancellable = g_cancellable_new (); if (verbose) @@ -219,7 +219,7 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir) chunk->stream_id, chunk->timestamp, chunk->message_length, chunk->message_type_id, chunk->info); - if (chunk->stream_id == 3 && chunk->message_type_id == 20) { + if (chunk->message_type_id == 0x14 || chunk->message_type_id == 0x18) { dump_command (chunk); } gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); |