diff options
author | David Schleef <ds@schleef.org> | 2014-09-01 13:38:54 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-09-01 13:38:54 -0700 |
commit | 2002d4ddef7c0c4fe9a3701b93c0b567fb205ebc (patch) | |
tree | 6aa5f6d4e5a82f56083280361fa9624f12bc5cec | |
parent | 6fd24b0c5e6bab6a98af932cbca351b612264e04 (diff) |
hacking
-rw-r--r-- | plugins/gstrtmp2sink.c | 633 | ||||
-rw-r--r-- | plugins/gstrtmp2sink.h | 23 | ||||
-rw-r--r-- | plugins/gstrtmp2src.c | 115 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 5 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 17 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 4 | ||||
-rw-r--r-- | rtmp/rtmpserver.c | 3 |
7 files changed, 622 insertions, 178 deletions
diff --git a/plugins/gstrtmp2sink.c b/plugins/gstrtmp2sink.c index b6d4713..fedaa06 100644 --- a/plugins/gstrtmp2sink.c +++ b/plugins/gstrtmp2sink.c @@ -25,7 +25,8 @@ * <refsect2> * <title>Example launch line</title> * |[ - * gst-launch -v fakesrc ! rtmp2sink ! FIXME ! fakesink + * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink + * location=rtmp://server.example.com/live/myStream * ]| * FIXME Describe what the pipeline does. * </refsect2> @@ -38,53 +39,88 @@ #include <gst/gst.h> #include <gst/base/gstbasesink.h> #include "gstrtmp2sink.h" +#include <string.h> GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category); #define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category /* prototypes */ - +/* GObject virtual functions */ static void gst_rtmp2_sink_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec); static void gst_rtmp2_sink_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp2_sink_dispose (GObject * object); static void gst_rtmp2_sink_finalize (GObject * object); +static void gst_rtmp2_sink_uri_handler_init (gpointer g_iface, + gpointer iface_data); -static GstCaps *gst_rtmp2_sink_get_caps (GstBaseSink * sink, GstCaps * filter); -static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps); -static GstCaps *gst_rtmp2_sink_fixate (GstBaseSink * sink, GstCaps * caps); -static gboolean gst_rtmp2_sink_activate_pull (GstBaseSink * sink, - gboolean active); +/* GstBaseSink virtual functions */ static void gst_rtmp2_sink_get_times (GstBaseSink * sink, GstBuffer * buffer, GstClockTime * start, GstClockTime * end); -static gboolean gst_rtmp2_sink_propose_allocation (GstBaseSink * sink, - GstQuery * query); static gboolean gst_rtmp2_sink_start (GstBaseSink * sink); static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink); static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink); static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink); static gboolean gst_rtmp2_sink_query (GstBaseSink * sink, GstQuery * query); static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event); -static GstFlowReturn gst_rtmp2_sink_wait_event (GstBaseSink * sink, - GstEvent * event); -static GstFlowReturn gst_rtmp2_sink_prepare (GstBaseSink * sink, - GstBuffer * buffer); -static GstFlowReturn gst_rtmp2_sink_prepare_list (GstBaseSink * sink, - GstBufferList * buffer_list); -static GstFlowReturn gst_rtmp2_sink_preroll (GstBaseSink * sink, - GstBuffer * buffer); static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer); -static GstFlowReturn gst_rtmp2_sink_render_list (GstBaseSink * sink, - GstBufferList * buffer_list); + +/* URI handler */ +static GstURIType gst_rtmp2_sink_uri_get_type (GType type); +static const gchar *const *gst_rtmp2_sink_uri_get_protocols (GType type); +static gchar *gst_rtmp2_sink_uri_get_uri (GstURIHandler * handler); +static gboolean gst_rtmp2_sink_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error); + +/* Internal API */ +static gchar *gst_rtmp2_sink_get_uri (GstRtmp2Sink * sink); +static gboolean gst_rtmp2_sink_set_uri (GstRtmp2Sink * sink, const char *uri); +static void gst_rtmp2_sink_task (gpointer user_data); +static void connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void send_connect (GstRtmp2Sink * rtmp2sink); +static void cmd_connect_done (GstRtmpConnection * connection, + GstRtmpChunk * chunk, const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, + gpointer user_data); +static void send_create_stream (GstRtmp2Sink * rtmp2sink); +static void create_stream_done (GstRtmpConnection * connection, + GstRtmpChunk * chunk, const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, + gpointer user_data); +static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data); +static void dump_command (GstRtmpChunk * chunk); +static void dump_chunk (GstRtmpChunk * chunk, gboolean dir); +static void send_secure_token_response (GstRtmp2Sink * rtmp2sink, + const char *challenge); + enum { - PROP_0 + PROP_0, + PROP_LOCATION, + PROP_TIMEOUT, + PROP_SERVER_ADDRESS, + PROP_PORT, + PROP_APPLICATION, + PROP_STREAM, + PROP_SECURE_TOKEN }; +#define DEFAULT_LOCATION "rtmp://localhost/live/myStream" +#define DEFAULT_TIMEOUT 5 +#define DEFAULT_SERVER_ADDRESS "" +#define DEFAULT_PORT 1935 +#define DEFAULT_APPLICATION "live" +#define DEFAULT_STREAM "myStream" +//#define DEFAULT_SECURE_TOKEN "" +/* FIXME for testing only */ +#define DEFAULT_SECURE_TOKEN "4305c027c2758beb" + /* pad templates */ static GstStaticPadTemplate gst_rtmp2_sink_sink_template = @@ -97,12 +133,18 @@ GST_STATIC_PAD_TEMPLATE ("sink", /* class initialization */ +static void +do_init (GType g_define_type_id) +{ + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtmp2_sink_uri_handler_init); + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0, + "debug category for rtmp2sink element"); +} + G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK, - GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0, - "debug category for rtmp2sink element")); + do_init (g_define_type_id)) -static void -gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) + static void gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass); @@ -113,40 +155,66 @@ gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) gst_static_pad_template_get (&gst_rtmp2_sink_sink_template)); gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass), - "FIXME Long name", "Generic", "FIXME Description", - "FIXME <fixme@example.com>"); + "RTMP sink element", "Sink", "Sink element for publishing RTMP streams", + "David Schleef <ds@schleef.org>"); gobject_class->set_property = gst_rtmp2_sink_set_property; gobject_class->get_property = gst_rtmp2_sink_get_property; gobject_class->dispose = gst_rtmp2_sink_dispose; gobject_class->finalize = gst_rtmp2_sink_finalize; - base_sink_class->get_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_get_caps); - base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps); - base_sink_class->fixate = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_fixate); - base_sink_class->activate_pull = - GST_DEBUG_FUNCPTR (gst_rtmp2_sink_activate_pull); base_sink_class->get_times = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_get_times); - base_sink_class->propose_allocation = - GST_DEBUG_FUNCPTR (gst_rtmp2_sink_propose_allocation); base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start); base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop); base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock); base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop); - base_sink_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_query); + if (0) + base_sink_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_query); base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event); - base_sink_class->wait_event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_wait_event); - base_sink_class->prepare = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_prepare); - base_sink_class->prepare_list = - GST_DEBUG_FUNCPTR (gst_rtmp2_sink_prepare_list); - base_sink_class->preroll = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_preroll); base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render); - base_sink_class->render_list = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render_list); + + g_object_class_install_property (gobject_class, PROP_LOCATION, + g_param_spec_string ("location", "RTMP Location", + "Location of the RTMP url to read", + DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SERVER_ADDRESS, + g_param_spec_string ("server-address", "RTMP Server Address", + "Address of RTMP server", + DEFAULT_SERVER_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_int ("port", "RTMP server port", + "RTMP server port (usually 1935)", + 1, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_APPLICATION, + g_param_spec_string ("application", "RTMP application", + "RTMP application", + DEFAULT_APPLICATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_STREAM, + g_param_spec_string ("stream", "RTMP stream", + "RTMP stream", + DEFAULT_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SECURE_TOKEN, + g_param_spec_string ("secure-token", "Secure token", + "Secure token used for authentication", + DEFAULT_SECURE_TOKEN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void gst_rtmp2_sink_init (GstRtmp2Sink * rtmp2sink) { + rtmp2sink->timeout = DEFAULT_TIMEOUT; + gst_rtmp2_sink_set_uri (rtmp2sink, DEFAULT_LOCATION); + rtmp2sink->secure_token = g_strdup (DEFAULT_SECURE_TOKEN); + + g_mutex_init (&rtmp2sink->lock); + g_cond_init (&rtmp2sink->cond); + rtmp2sink->task = gst_task_new (gst_rtmp2_sink_task, rtmp2sink, NULL); + g_rec_mutex_init (&rtmp2sink->task_lock); + gst_task_set_lock (rtmp2sink->task, &rtmp2sink->task_lock); + rtmp2sink->client = gst_rtmp_client_new (); + rtmp2sink->connection = gst_rtmp_client_get_connection (rtmp2sink->client); + + g_object_set (rtmp2sink->client, "timeout", rtmp2sink->timeout, NULL); } void @@ -158,6 +226,28 @@ gst_rtmp2_sink_set_property (GObject * object, guint property_id, GST_DEBUG_OBJECT (rtmp2sink, "set_property"); switch (property_id) { + case PROP_LOCATION: + gst_rtmp2_sink_set_uri (rtmp2sink, g_value_get_string (value)); + break; + case PROP_SERVER_ADDRESS: + g_free (rtmp2sink->server_address); + rtmp2sink->server_address = g_value_dup_string (value); + break; + case PROP_PORT: + rtmp2sink->port = g_value_get_int (value); + break; + case PROP_APPLICATION: + g_free (rtmp2sink->application); + rtmp2sink->application = g_value_dup_string (value); + break; + case PROP_STREAM: + g_free (rtmp2sink->stream); + rtmp2sink->stream = g_value_dup_string (value); + break; + case PROP_SECURE_TOKEN: + g_free (rtmp2sink->secure_token); + rtmp2sink->secure_token = g_value_dup_string (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -173,6 +263,24 @@ gst_rtmp2_sink_get_property (GObject * object, guint property_id, GST_DEBUG_OBJECT (rtmp2sink, "get_property"); switch (property_id) { + case PROP_LOCATION: + g_value_set_string (value, gst_rtmp2_sink_get_uri (rtmp2sink)); + break; + case PROP_SERVER_ADDRESS: + g_value_set_string (value, rtmp2sink->server_address); + break; + case PROP_PORT: + g_value_set_int (value, rtmp2sink->port); + break; + case PROP_APPLICATION: + g_value_set_string (value, rtmp2sink->application); + break; + case PROP_STREAM: + g_value_set_string (value, rtmp2sink->stream); + break; + case PROP_SECURE_TOKEN: + g_value_set_string (value, rtmp2sink->secure_token); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -199,51 +307,24 @@ gst_rtmp2_sink_finalize (GObject * object) GST_DEBUG_OBJECT (rtmp2sink, "finalize"); /* clean up object here */ + g_object_unref (rtmp2sink->task); + g_rec_mutex_clear (&rtmp2sink->task_lock); + g_object_unref (rtmp2sink->client); + g_mutex_clear (&rtmp2sink->lock); + g_cond_clear (&rtmp2sink->cond); G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object); } -static GstCaps * -gst_rtmp2_sink_get_caps (GstBaseSink * sink, GstCaps * filter) -{ - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); - - GST_DEBUG_OBJECT (rtmp2sink, "get_caps"); - - return NULL; -} - -/* notify subclass of new caps */ -static gboolean -gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps) -{ - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); - - GST_DEBUG_OBJECT (rtmp2sink, "set_caps"); - - return TRUE; -} - -/* fixate sink caps during pull-mode negotiation */ -static GstCaps * -gst_rtmp2_sink_fixate (GstBaseSink * sink, GstCaps * caps) -{ - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); - - GST_DEBUG_OBJECT (rtmp2sink, "fixate"); - - return NULL; -} - -/* start or stop a pulling thread */ -static gboolean -gst_rtmp2_sink_activate_pull (GstBaseSink * sink, gboolean active) +static void +gst_rtmp2_sink_uri_handler_init (gpointer g_iface, gpointer iface_data) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); - - GST_DEBUG_OBJECT (rtmp2sink, "activate_pull"); + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; - return TRUE; + iface->get_type = gst_rtmp2_sink_uri_get_type; + iface->get_protocols = gst_rtmp2_sink_uri_get_protocols; + iface->get_uri = gst_rtmp2_sink_uri_get_uri; + iface->set_uri = gst_rtmp2_sink_uri_set_uri; } /* get the start and end times for syncing on this buffer */ @@ -257,17 +338,6 @@ gst_rtmp2_sink_get_times (GstBaseSink * sink, GstBuffer * buffer, } -/* propose allocation parameters for upstream */ -static gboolean -gst_rtmp2_sink_propose_allocation (GstBaseSink * sink, GstQuery * query) -{ - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); - - GST_DEBUG_OBJECT (rtmp2sink, "propose_allocation"); - - return TRUE; -} - /* start and stop processing, ideal for opening/closing the resource */ static gboolean gst_rtmp2_sink_start (GstBaseSink * sink) @@ -276,6 +346,8 @@ gst_rtmp2_sink_start (GstBaseSink * sink) GST_DEBUG_OBJECT (rtmp2sink, "start"); + gst_task_start (rtmp2sink->task); + return TRUE; } @@ -298,6 +370,11 @@ gst_rtmp2_sink_unlock (GstBaseSink * sink) GST_DEBUG_OBJECT (rtmp2sink, "unlock"); + rtmp2sink->reset = TRUE; + g_mutex_lock (&rtmp2sink->lock); + g_cond_signal (&rtmp2sink->cond); + g_mutex_unlock (&rtmp2sink->lock); + return TRUE; } @@ -311,6 +388,11 @@ gst_rtmp2_sink_unlock_stop (GstBaseSink * sink) GST_DEBUG_OBJECT (rtmp2sink, "unlock_stop"); + gst_task_stop (rtmp2sink->task); + if (rtmp2sink->task_main_loop) { + g_main_loop_quit (rtmp2sink->task_main_loop); + } + return TRUE; } @@ -336,66 +418,375 @@ gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event) return TRUE; } -/* wait for eos or gap, subclasses should chain up to parent first */ static GstFlowReturn -gst_rtmp2_sink_wait_event (GstBaseSink * sink, GstEvent * event) +gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer) { GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + GstRtmpChunk *chunk; + GBytes *bytes; + gsize size; + guint8 *data; - GST_DEBUG_OBJECT (rtmp2sink, "wait_event"); + GST_DEBUG_OBJECT (rtmp2sink, "render"); + + size = gst_buffer_get_size (buffer); + gst_buffer_extract_dup (buffer, 0, size, (gpointer *) & data, &size); + + if (size >= 4) { + if (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') { + /* drop the header, we don't need it */ + g_free (data); + return GST_FLOW_OK; + } + } + + if (size < 15) { + g_free (data); + return GST_FLOW_ERROR; + } + + chunk = gst_rtmp_chunk_new (); + chunk->message_type_id = data[0]; + if (chunk->message_type_id == 18) { + chunk->stream_id = 5; + } else if (chunk->message_type_id == 9) { + chunk->stream_id = 7; + } else { + GST_ERROR ("unknown message_type_id %d", chunk->message_type_id); + } + chunk->message_length = GST_READ_UINT24_BE (data + 1); + chunk->timestamp = GST_READ_UINT24_BE (data + 4); + chunk->info = 1; /* FIXME use actual stream id */ + + if (chunk->message_length != size - 15) { + GST_ERROR ("message length was %" G_GSIZE_FORMAT " expected %" + G_GSIZE_FORMAT, chunk->message_length, size - 15); + } + + bytes = g_bytes_new_take (data, size); + chunk->payload = g_bytes_new_from_bytes (bytes, 11, size - 15); + + if (rtmp2sink->dump) { + dump_chunk (chunk, TRUE); + } + + gst_rtmp_connection_queue_chunk (rtmp2sink->connection, chunk); return GST_FLOW_OK; } -/* notify subclass of buffer or list before doing sync */ -static GstFlowReturn -gst_rtmp2_sink_prepare (GstBaseSink * sink, GstBuffer * buffer) + +/* URL handler */ + +static GstURIType +gst_rtmp2_sink_uri_get_type (GType type) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + return GST_URI_SINK; +} - GST_DEBUG_OBJECT (rtmp2sink, "prepare"); +static const gchar *const * +gst_rtmp2_sink_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { "rtmp", NULL }; - return GST_FLOW_OK; + return protocols; } -static GstFlowReturn -gst_rtmp2_sink_prepare_list (GstBaseSink * sink, GstBufferList * buffer_list) +static gchar * +gst_rtmp2_sink_uri_get_uri (GstURIHandler * handler) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + GstRtmp2Sink *sink = GST_RTMP2_SINK (handler); - GST_DEBUG_OBJECT (rtmp2sink, "prepare_list"); + return gst_rtmp2_sink_get_uri (sink); +} - return GST_FLOW_OK; +static gboolean +gst_rtmp2_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** error) +{ + GstRtmp2Sink *sink = GST_RTMP2_SINK (handler); + gboolean ret; + + ret = gst_rtmp2_sink_set_uri (sink, uri); + if (!ret && error) { + *error = + g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED, + "Invalid URI"); + } + + return ret; } -/* notify subclass of preroll buffer or real buffer */ -static GstFlowReturn -gst_rtmp2_sink_preroll (GstBaseSink * sink, GstBuffer * buffer) +/* Internal API */ + +static gchar * +gst_rtmp2_sink_get_uri (GstRtmp2Sink * sink) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + if (sink->port == 1935) { + return g_strdup_printf ("rtmp://%s/%s/%s", sink->server_address, + sink->application, sink->stream); + } else { + return g_strdup_printf ("rtmp://%s:%d/%s/%s", sink->server_address, + sink->port, sink->application, sink->stream); + } +} + +/* It would really be awesome if GStreamer had full URI parsing. Alas. */ +/* FIXME this function needs more error checking, and testing */ +static gboolean +gst_rtmp2_sink_set_uri (GstRtmp2Sink * sink, const char *uri) +{ + gchar *location; + gchar **parts; + gchar **parts2; + gboolean ret; - GST_DEBUG_OBJECT (rtmp2sink, "preroll"); + GST_DEBUG ("setting uri to %s", uri); - return GST_FLOW_OK; + if (!gst_uri_has_protocol (uri, "rtmp")) + return FALSE; + + location = gst_uri_get_location (uri); + + parts = g_strsplit (location, "/", 3); + if (parts[0] == NULL || parts[1] == NULL || parts[2] == NULL) { + ret = FALSE; + goto out; + } + + parts2 = g_strsplit (parts[0], ":", 2); + if (parts2[1]) { + sink->port = g_ascii_strtoull (parts2[1], NULL, 10); + } else { + sink->port = 1935; + } + g_free (sink->server_address); + sink->server_address = g_strdup (parts2[0]); + g_strfreev (parts2); + + g_free (sink->application); + sink->application = g_strdup (parts[1]); + g_free (sink->stream); + sink->stream = g_strdup (parts[2]); + + ret = TRUE; + +out: + g_free (location); + g_strfreev (parts); + return ret; } -static GstFlowReturn -gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer) +static void +gst_rtmp2_sink_task (gpointer user_data) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data); + GMainLoop *main_loop; + GMainContext *main_context; + + GST_ERROR ("gst_rtmp2_sink_task starting"); + + gst_rtmp_client_set_server_address (rtmp2sink->client, + rtmp2sink->server_address); + gst_rtmp_client_connect_async (rtmp2sink->client, NULL, connect_done, + rtmp2sink); + + main_context = g_main_context_new (); + main_loop = g_main_loop_new (main_context, TRUE); + rtmp2sink->task_main_loop = main_loop; + g_main_loop_run (main_loop); + rtmp2sink->task_main_loop = NULL; + g_main_loop_unref (main_loop); + g_main_context_unref (main_context); + + GST_ERROR ("gst_rtmp2_sink_task exiting"); +} - GST_DEBUG_OBJECT (rtmp2sink, "render"); +static void +connect_done (GObject * source, GAsyncResult * result, gpointer user_data) +{ + GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data); + GError *error = NULL; + gboolean ret; + + ret = gst_rtmp_client_connect_finish (rtmp2sink->client, result, &error); + if (!ret) { + GST_ELEMENT_ERROR (rtmp2sink, RESOURCE, OPEN_READ, + ("Could not connect to server"), ("%s", error->message)); + g_error_free (error); + return; + } - return GST_FLOW_OK; + g_signal_connect (rtmp2sink->connection, "got-chunk", G_CALLBACK (got_chunk), + rtmp2sink); + + send_connect (rtmp2sink); } -/* Render a BufferList */ -static GstFlowReturn -gst_rtmp2_sink_render_list (GstBaseSink * sink, GstBufferList * buffer_list) +static void +send_connect (GstRtmp2Sink * rtmp2sink) { - GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink); + GstAmfNode *node; + gchar *uri; + + node = gst_amf_node_new (GST_AMF_TYPE_OBJECT); + gst_amf_object_set_string (node, "app", rtmp2sink->application); + gst_amf_object_set_string (node, "type", "nonprivate"); + uri = gst_rtmp2_sink_get_uri (rtmp2sink); + gst_amf_object_set_string (node, "tcUrl", uri); + g_free (uri); + // "fpad": False, + // "capabilities": 15, + // "audioCodecs": 3191, + // "videoCodecs": 252, + // "videoFunction": 1, + gst_rtmp_connection_send_command (rtmp2sink->connection, 3, "connect", 1, + node, NULL, cmd_connect_done, rtmp2sink); +} + +static void +cmd_connect_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data) +{ + GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data); + gboolean ret; + + ret = FALSE; + if (optional_args) { + const GstAmfNode *n; + n = gst_amf_node_get_object (optional_args, "code"); + if (n) { + const char *s; + s = gst_amf_node_get_string (n); + if (strcmp (s, "NetConnection.Connect.Success") == 0) { + ret = TRUE; + } + } + } - GST_DEBUG_OBJECT (rtmp2sink, "render_list"); + if (ret) { + const GstAmfNode *n; + + GST_DEBUG ("success"); + + n = gst_amf_node_get_object (optional_args, "secureToken"); + if (n) { + const gchar *challenge; + challenge = gst_amf_node_get_string (n); + GST_DEBUG ("secureToken challenge: %s", challenge); + send_secure_token_response (rtmp2sink, challenge); + } + + send_create_stream (rtmp2sink); + } else { + GST_ERROR ("connect error"); + } +} + +static void +send_create_stream (GstRtmp2Sink * rtmp2sink) +{ + GstAmfNode *node; + + node = gst_amf_node_new (GST_AMF_TYPE_NULL); + gst_rtmp_connection_send_command (rtmp2sink->connection, 3, "createStream", 2, + node, NULL, create_stream_done, rtmp2sink); + +} + +static void +create_stream_done (GstRtmpConnection * connection, GstRtmpChunk * chunk, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, gpointer user_data) +{ + //GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data); + gboolean ret; + int stream_id; + + ret = FALSE; + if (optional_args) { + stream_id = gst_amf_node_get_number (optional_args); + ret = TRUE; + } + + if (ret) { + GST_DEBUG ("createStream success, stream_id=%d", stream_id); + //send_play (rtmp2sink); + } else { + GST_ERROR ("createStream failed"); + } +} + +static void +got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data) +{ + GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data); + + if (rtmp2sink->dump) { + dump_chunk (chunk, FALSE); + } +} + +static void +dump_command (GstRtmpChunk * chunk) +{ + GstAmfNode *amf; + gsize size; + const guint8 *data; + gsize n_parsed; + int offset; + + offset = 0; + data = g_bytes_get_data (chunk->payload, &size); + while (offset < size) { + amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + gst_amf_node_dump (amf); + gst_amf_node_free (amf); + offset += n_parsed; + } +} + +static void +dump_chunk (GstRtmpChunk * chunk, gboolean dir) +{ + g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT + " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<", + chunk->stream_id, + chunk->timestamp, + chunk->message_length, chunk->message_type_id, chunk->info); + if (chunk->message_type_id == 20) { + dump_command (chunk); + } + if (chunk->message_type_id == 18) { + dump_command (chunk); + } + gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); +} + +static void +send_secure_token_response (GstRtmp2Sink * rtmp2sink, const char *challenge) +{ + GstAmfNode *node1; + GstAmfNode *node2; + gchar *response; + + if (rtmp2sink->secure_token == NULL || !rtmp2sink->secure_token[0]) { + GST_ELEMENT_ERROR (rtmp2sink, RESOURCE, OPEN_READ, + ("Server requested secureToken authentication"), (NULL)); + return; + } + + response = gst_rtmp_tea_decode (rtmp2sink->secure_token, challenge); + + GST_DEBUG ("response: %s", response); + + node1 = gst_amf_node_new (GST_AMF_TYPE_NULL); + node2 = gst_amf_node_new (GST_AMF_TYPE_STRING); + gst_amf_node_set_string_take (node2, response); + + gst_rtmp_connection_send_command (rtmp2sink->connection, 3, + "secureTokenResponse", 0, node1, node2, NULL, NULL); - return GST_FLOW_OK; } diff --git a/plugins/gstrtmp2sink.h b/plugins/gstrtmp2sink.h index bccb110..eed580d 100644 --- a/plugins/gstrtmp2sink.h +++ b/plugins/gstrtmp2sink.h @@ -21,6 +21,8 @@ #define _GST_RTMP2_SINK_H_ #include <gst/base/gstbasesink.h> +#include <rtmp/rtmpclient.h> +#include <rtmp/rtmputils.h> G_BEGIN_DECLS @@ -37,6 +39,27 @@ struct _GstRtmp2Sink { GstBaseSink base_rtmp2sink; + /* properties */ + char *uri; + int timeout; + char *server_address; + int port; + char *application; + char *stream; + char *secure_token; + + /* stuff */ + GMutex lock; + GCond cond; + gboolean reset; + GstTask *task; + GRecMutex task_lock; + GMainLoop *task_main_loop; + + GstRtmpClient *client; + GstRtmpConnection *connection; + gboolean dump; + }; struct _GstRtmp2SinkClass diff --git a/plugins/gstrtmp2src.c b/plugins/gstrtmp2src.c index 4043b92..2811733 100644 --- a/plugins/gstrtmp2src.c +++ b/plugins/gstrtmp2src.c @@ -46,7 +46,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category); /* prototypes */ - +/* GObject virtual functions */ static void gst_rtmp2_src_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec); static void gst_rtmp2_src_get_property (GObject * object, @@ -56,7 +56,7 @@ static void gst_rtmp2_src_finalize (GObject * object); static void gst_rtmp2_src_uri_handler_init (gpointer g_iface, gpointer iface_data); -static void gst_rtmp2_src_task (gpointer user_data); +/* GstBaseSrc virtual functions */ static gboolean gst_rtmp2_src_start (GstBaseSrc * src); static gboolean gst_rtmp2_src_stop (GstBaseSrc * src); static void gst_rtmp2_src_get_times (GstBaseSrc * src, GstBuffer * buffer, @@ -75,6 +75,15 @@ static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, static GstFlowReturn gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** buf); +/* URI handler */ +static GstURIType gst_rtmp2_src_uri_get_type (GType type); +static const gchar *const *gst_rtmp2_src_uri_get_protocols (GType type); +static gchar *gst_rtmp2_src_uri_get_uri (GstURIHandler * handler); +static gboolean gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error); + +/* Internal API */ +static void gst_rtmp2_src_task (gpointer user_data); static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); static void connect_done (GObject * source, GAsyncResult * result, @@ -133,16 +142,18 @@ GST_STATIC_PAD_TEMPLATE ("src", /* class initialization */ +static void +do_init (GType g_define_type_id) +{ + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtmp2_src_uri_handler_init); + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, + "debug category for rtmp2src element"); +} + G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC, - do { - G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, - gst_rtmp2_src_uri_handler_init); - GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, - "debug category for rtmp2src element"); - } while (0)); + do_init (g_define_type_id)) -static void -gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) + static void gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass); @@ -207,6 +218,7 @@ static void gst_rtmp2_src_init (GstRtmp2Src * rtmp2src) { g_mutex_init (&rtmp2src->lock); + g_cond_init (&rtmp2src->cond); rtmp2src->queue = g_queue_new (); //gst_base_src_set_live (GST_BASE_SRC(rtmp2src), TRUE); @@ -223,45 +235,6 @@ gst_rtmp2_src_init (GstRtmp2Src * rtmp2src) } -static GstURIType -gst_rtmp2_src_uri_get_type (GType type) -{ - return GST_URI_SRC; -} - -static const gchar *const * -gst_rtmp2_src_uri_get_protocols (GType type) -{ - static const gchar *protocols[] = { "rtmp", NULL }; - - return protocols; -} - -static gchar * -gst_rtmp2_src_uri_get_uri (GstURIHandler * handler) -{ - GstRtmp2Src *src = GST_RTMP2_SRC (handler); - - return gst_rtmp2_src_get_uri (src); -} - -static gboolean -gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, - GError ** error) -{ - GstRtmp2Src *src = GST_RTMP2_SRC (handler); - gboolean ret; - - ret = gst_rtmp2_src_set_uri (src, uri); - if (!ret && error) { - *error = - g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED, - "Invalid URI"); - } - - return ret; -} - static void gst_rtmp2_src_uri_handler_init (gpointer g_iface, gpointer iface_data) { @@ -367,6 +340,7 @@ gst_rtmp2_src_finalize (GObject * object) g_rec_mutex_clear (&rtmp2src->task_lock); g_object_unref (rtmp2src->client); g_mutex_clear (&rtmp2src->lock); + g_cond_clear (&rtmp2src->cond); g_queue_free_full (rtmp2src->queue, g_object_unref); G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object); @@ -821,6 +795,49 @@ gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size, return GST_FLOW_OK; } + +/* URL handler */ + +static GstURIType +gst_rtmp2_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +gst_rtmp2_src_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { "rtmp", NULL }; + + return protocols; +} + +static gchar * +gst_rtmp2_src_uri_get_uri (GstURIHandler * handler) +{ + GstRtmp2Src *src = GST_RTMP2_SRC (handler); + + return gst_rtmp2_src_get_uri (src); +} + +static gboolean +gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** error) +{ + GstRtmp2Src *src = GST_RTMP2_SRC (handler); + gboolean ret; + + ret = gst_rtmp2_src_set_uri (src, uri); + if (!ret && error) { + *error = + g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED, + "Invalid URI"); + } + + return ret; +} + + /* internal API */ static gchar * diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index ecec842..ed7c0d1 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -99,6 +99,8 @@ gst_rtmp_client_init (GstRtmpClient * rtmpclient) { rtmpclient->server_address = g_strdup (DEFAULT_SERVER_ADDRESS); rtmpclient->server_port = DEFAULT_SERVER_PORT; + + rtmpclient->connection = gst_rtmp_connection_new (); } void @@ -256,7 +258,8 @@ gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result, return; } - client->connection = gst_rtmp_connection_new (client->socket_connection); + gst_rtmp_connection_set_socket_connection (client->connection, + client->socket_connection); gst_rtmp_connection_start_handshake (client->connection, FALSE); g_simple_async_result_complete (client->async); diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index c4277c5..1529ed4 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -198,13 +198,22 @@ gst_rtmp_connection_finalize (GObject * object) } GstRtmpConnection * -gst_rtmp_connection_new (GSocketConnection * connection) +gst_rtmp_connection_new (void) { GstRtmpConnection *sc; - GInputStream *is; sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); - sc->connection = connection; + + return sc; +} + +void +gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc, + GSocketConnection * connection) +{ + GInputStream *is; + + sc->connection = g_object_ref (connection); is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); sc->input_source = @@ -213,8 +222,6 @@ gst_rtmp_connection_new (GSocketConnection * connection) g_source_set_callback (sc->input_source, (GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL); g_source_attach (sc->input_source, NULL); - - return sc; } static void diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index db26288..6f2a6a2 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -93,7 +93,9 @@ struct _GstRtmpConnectionClass GType gst_rtmp_connection_get_type (void); -GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection *connection); +GstRtmpConnection *gst_rtmp_connection_new (void); +void gst_rtmp_connection_set_socket_connection ( + GstRtmpConnection *rtmpconnection, GSocketConnection *connection); void gst_rtmp_connection_start_handshake (GstRtmpConnection *connection, gboolean is_server); diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index 7bb2854..41cd93e 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -182,7 +182,8 @@ gst_rtmp_server_incoming (GSocketService * service, GST_INFO ("client connected"); g_object_ref (socket_connection); - connection = gst_rtmp_connection_new (socket_connection); + connection = gst_rtmp_connection_new (); + gst_rtmp_connection_set_socket_connection (connection, socket_connection); gst_rtmp_server_add_connection (rtmpserver, connection); gst_rtmp_connection_start_handshake (connection, TRUE); |