summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-09-01 13:38:54 -0700
committerDavid Schleef <ds@schleef.org>2014-09-01 13:38:54 -0700
commit2002d4ddef7c0c4fe9a3701b93c0b567fb205ebc (patch)
tree6aa5f6d4e5a82f56083280361fa9624f12bc5cec
parent6fd24b0c5e6bab6a98af932cbca351b612264e04 (diff)
hacking
-rw-r--r--plugins/gstrtmp2sink.c633
-rw-r--r--plugins/gstrtmp2sink.h23
-rw-r--r--plugins/gstrtmp2src.c115
-rw-r--r--rtmp/rtmpclient.c5
-rw-r--r--rtmp/rtmpconnection.c17
-rw-r--r--rtmp/rtmpconnection.h4
-rw-r--r--rtmp/rtmpserver.c3
7 files changed, 622 insertions, 178 deletions
diff --git a/plugins/gstrtmp2sink.c b/plugins/gstrtmp2sink.c
index b6d4713..fedaa06 100644
--- a/plugins/gstrtmp2sink.c
+++ b/plugins/gstrtmp2sink.c
@@ -25,7 +25,8 @@
* <refsect2>
* <title>Example launch line</title>
* |[
- * gst-launch -v fakesrc ! rtmp2sink ! FIXME ! fakesink
+ * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink
+ * location=rtmp://server.example.com/live/myStream
* ]|
* FIXME Describe what the pipeline does.
* </refsect2>
@@ -38,53 +39,88 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gstrtmp2sink.h"
+#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category);
#define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category
/* prototypes */
-
+/* GObject virtual functions */
static void gst_rtmp2_sink_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_rtmp2_sink_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp2_sink_dispose (GObject * object);
static void gst_rtmp2_sink_finalize (GObject * object);
+static void gst_rtmp2_sink_uri_handler_init (gpointer g_iface,
+ gpointer iface_data);
-static GstCaps *gst_rtmp2_sink_get_caps (GstBaseSink * sink, GstCaps * filter);
-static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
-static GstCaps *gst_rtmp2_sink_fixate (GstBaseSink * sink, GstCaps * caps);
-static gboolean gst_rtmp2_sink_activate_pull (GstBaseSink * sink,
- gboolean active);
+/* GstBaseSink virtual functions */
static void gst_rtmp2_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end);
-static gboolean gst_rtmp2_sink_propose_allocation (GstBaseSink * sink,
- GstQuery * query);
static gboolean gst_rtmp2_sink_start (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_query (GstBaseSink * sink, GstQuery * query);
static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event);
-static GstFlowReturn gst_rtmp2_sink_wait_event (GstBaseSink * sink,
- GstEvent * event);
-static GstFlowReturn gst_rtmp2_sink_prepare (GstBaseSink * sink,
- GstBuffer * buffer);
-static GstFlowReturn gst_rtmp2_sink_prepare_list (GstBaseSink * sink,
- GstBufferList * buffer_list);
-static GstFlowReturn gst_rtmp2_sink_preroll (GstBaseSink * sink,
- GstBuffer * buffer);
static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink,
GstBuffer * buffer);
-static GstFlowReturn gst_rtmp2_sink_render_list (GstBaseSink * sink,
- GstBufferList * buffer_list);
+
+/* URI handler */
+static GstURIType gst_rtmp2_sink_uri_get_type (GType type);
+static const gchar *const *gst_rtmp2_sink_uri_get_protocols (GType type);
+static gchar *gst_rtmp2_sink_uri_get_uri (GstURIHandler * handler);
+static gboolean gst_rtmp2_sink_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error);
+
+/* Internal API */
+static gchar *gst_rtmp2_sink_get_uri (GstRtmp2Sink * sink);
+static gboolean gst_rtmp2_sink_set_uri (GstRtmp2Sink * sink, const char *uri);
+static void gst_rtmp2_sink_task (gpointer user_data);
+static void connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void send_connect (GstRtmp2Sink * rtmp2sink);
+static void cmd_connect_done (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk, const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args,
+ gpointer user_data);
+static void send_create_stream (GstRtmp2Sink * rtmp2sink);
+static void create_stream_done (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk, const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args,
+ gpointer user_data);
+static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data);
+static void dump_command (GstRtmpChunk * chunk);
+static void dump_chunk (GstRtmpChunk * chunk, gboolean dir);
+static void send_secure_token_response (GstRtmp2Sink * rtmp2sink,
+ const char *challenge);
+
enum
{
- PROP_0
+ PROP_0,
+ PROP_LOCATION,
+ PROP_TIMEOUT,
+ PROP_SERVER_ADDRESS,
+ PROP_PORT,
+ PROP_APPLICATION,
+ PROP_STREAM,
+ PROP_SECURE_TOKEN
};
+#define DEFAULT_LOCATION "rtmp://localhost/live/myStream"
+#define DEFAULT_TIMEOUT 5
+#define DEFAULT_SERVER_ADDRESS ""
+#define DEFAULT_PORT 1935
+#define DEFAULT_APPLICATION "live"
+#define DEFAULT_STREAM "myStream"
+//#define DEFAULT_SECURE_TOKEN ""
+/* FIXME for testing only */
+#define DEFAULT_SECURE_TOKEN "4305c027c2758beb"
+
/* pad templates */
static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
@@ -97,12 +133,18 @@ GST_STATIC_PAD_TEMPLATE ("sink",
/* class initialization */
+static void
+do_init (GType g_define_type_id)
+{
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtmp2_sink_uri_handler_init);
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
+ "debug category for rtmp2sink element");
+}
+
G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK,
- GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
- "debug category for rtmp2sink element"));
+ do_init (g_define_type_id))
-static void
-gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
+ static void gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
@@ -113,40 +155,66 @@ gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
gst_static_pad_template_get (&gst_rtmp2_sink_sink_template));
gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
- "FIXME Long name", "Generic", "FIXME Description",
- "FIXME <fixme@example.com>");
+ "RTMP sink element", "Sink", "Sink element for publishing RTMP streams",
+ "David Schleef <ds@schleef.org>");
gobject_class->set_property = gst_rtmp2_sink_set_property;
gobject_class->get_property = gst_rtmp2_sink_get_property;
gobject_class->dispose = gst_rtmp2_sink_dispose;
gobject_class->finalize = gst_rtmp2_sink_finalize;
- base_sink_class->get_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_get_caps);
- base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps);
- base_sink_class->fixate = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_fixate);
- base_sink_class->activate_pull =
- GST_DEBUG_FUNCPTR (gst_rtmp2_sink_activate_pull);
base_sink_class->get_times = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_get_times);
- base_sink_class->propose_allocation =
- GST_DEBUG_FUNCPTR (gst_rtmp2_sink_propose_allocation);
base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start);
base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop);
base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock);
base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop);
- base_sink_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_query);
+ if (0)
+ base_sink_class->query = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_query);
base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event);
- base_sink_class->wait_event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_wait_event);
- base_sink_class->prepare = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_prepare);
- base_sink_class->prepare_list =
- GST_DEBUG_FUNCPTR (gst_rtmp2_sink_prepare_list);
- base_sink_class->preroll = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_preroll);
base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render);
- base_sink_class->render_list = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render_list);
+
+ g_object_class_install_property (gobject_class, PROP_LOCATION,
+ g_param_spec_string ("location", "RTMP Location",
+ "Location of the RTMP url to read",
+ DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_SERVER_ADDRESS,
+ g_param_spec_string ("server-address", "RTMP Server Address",
+ "Address of RTMP server",
+ DEFAULT_SERVER_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_PORT,
+ g_param_spec_int ("port", "RTMP server port",
+ "RTMP server port (usually 1935)",
+ 1, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_APPLICATION,
+ g_param_spec_string ("application", "RTMP application",
+ "RTMP application",
+ DEFAULT_APPLICATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_STREAM,
+ g_param_spec_string ("stream", "RTMP stream",
+ "RTMP stream",
+ DEFAULT_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_SECURE_TOKEN,
+ g_param_spec_string ("secure-token", "Secure token",
+ "Secure token used for authentication",
+ DEFAULT_SECURE_TOKEN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gst_rtmp2_sink_init (GstRtmp2Sink * rtmp2sink)
{
+ rtmp2sink->timeout = DEFAULT_TIMEOUT;
+ gst_rtmp2_sink_set_uri (rtmp2sink, DEFAULT_LOCATION);
+ rtmp2sink->secure_token = g_strdup (DEFAULT_SECURE_TOKEN);
+
+ g_mutex_init (&rtmp2sink->lock);
+ g_cond_init (&rtmp2sink->cond);
+ rtmp2sink->task = gst_task_new (gst_rtmp2_sink_task, rtmp2sink, NULL);
+ g_rec_mutex_init (&rtmp2sink->task_lock);
+ gst_task_set_lock (rtmp2sink->task, &rtmp2sink->task_lock);
+ rtmp2sink->client = gst_rtmp_client_new ();
+ rtmp2sink->connection = gst_rtmp_client_get_connection (rtmp2sink->client);
+
+ g_object_set (rtmp2sink->client, "timeout", rtmp2sink->timeout, NULL);
}
void
@@ -158,6 +226,28 @@ gst_rtmp2_sink_set_property (GObject * object, guint property_id,
GST_DEBUG_OBJECT (rtmp2sink, "set_property");
switch (property_id) {
+ case PROP_LOCATION:
+ gst_rtmp2_sink_set_uri (rtmp2sink, g_value_get_string (value));
+ break;
+ case PROP_SERVER_ADDRESS:
+ g_free (rtmp2sink->server_address);
+ rtmp2sink->server_address = g_value_dup_string (value);
+ break;
+ case PROP_PORT:
+ rtmp2sink->port = g_value_get_int (value);
+ break;
+ case PROP_APPLICATION:
+ g_free (rtmp2sink->application);
+ rtmp2sink->application = g_value_dup_string (value);
+ break;
+ case PROP_STREAM:
+ g_free (rtmp2sink->stream);
+ rtmp2sink->stream = g_value_dup_string (value);
+ break;
+ case PROP_SECURE_TOKEN:
+ g_free (rtmp2sink->secure_token);
+ rtmp2sink->secure_token = g_value_dup_string (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -173,6 +263,24 @@ gst_rtmp2_sink_get_property (GObject * object, guint property_id,
GST_DEBUG_OBJECT (rtmp2sink, "get_property");
switch (property_id) {
+ case PROP_LOCATION:
+ g_value_set_string (value, gst_rtmp2_sink_get_uri (rtmp2sink));
+ break;
+ case PROP_SERVER_ADDRESS:
+ g_value_set_string (value, rtmp2sink->server_address);
+ break;
+ case PROP_PORT:
+ g_value_set_int (value, rtmp2sink->port);
+ break;
+ case PROP_APPLICATION:
+ g_value_set_string (value, rtmp2sink->application);
+ break;
+ case PROP_STREAM:
+ g_value_set_string (value, rtmp2sink->stream);
+ break;
+ case PROP_SECURE_TOKEN:
+ g_value_set_string (value, rtmp2sink->secure_token);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -199,51 +307,24 @@ gst_rtmp2_sink_finalize (GObject * object)
GST_DEBUG_OBJECT (rtmp2sink, "finalize");
/* clean up object here */
+ g_object_unref (rtmp2sink->task);
+ g_rec_mutex_clear (&rtmp2sink->task_lock);
+ g_object_unref (rtmp2sink->client);
+ g_mutex_clear (&rtmp2sink->lock);
+ g_cond_clear (&rtmp2sink->cond);
G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object);
}
-static GstCaps *
-gst_rtmp2_sink_get_caps (GstBaseSink * sink, GstCaps * filter)
-{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
-
- GST_DEBUG_OBJECT (rtmp2sink, "get_caps");
-
- return NULL;
-}
-
-/* notify subclass of new caps */
-static gboolean
-gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
-{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
-
- GST_DEBUG_OBJECT (rtmp2sink, "set_caps");
-
- return TRUE;
-}
-
-/* fixate sink caps during pull-mode negotiation */
-static GstCaps *
-gst_rtmp2_sink_fixate (GstBaseSink * sink, GstCaps * caps)
-{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
-
- GST_DEBUG_OBJECT (rtmp2sink, "fixate");
-
- return NULL;
-}
-
-/* start or stop a pulling thread */
-static gboolean
-gst_rtmp2_sink_activate_pull (GstBaseSink * sink, gboolean active)
+static void
+gst_rtmp2_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
-
- GST_DEBUG_OBJECT (rtmp2sink, "activate_pull");
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
- return TRUE;
+ iface->get_type = gst_rtmp2_sink_uri_get_type;
+ iface->get_protocols = gst_rtmp2_sink_uri_get_protocols;
+ iface->get_uri = gst_rtmp2_sink_uri_get_uri;
+ iface->set_uri = gst_rtmp2_sink_uri_set_uri;
}
/* get the start and end times for syncing on this buffer */
@@ -257,17 +338,6 @@ gst_rtmp2_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
}
-/* propose allocation parameters for upstream */
-static gboolean
-gst_rtmp2_sink_propose_allocation (GstBaseSink * sink, GstQuery * query)
-{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
-
- GST_DEBUG_OBJECT (rtmp2sink, "propose_allocation");
-
- return TRUE;
-}
-
/* start and stop processing, ideal for opening/closing the resource */
static gboolean
gst_rtmp2_sink_start (GstBaseSink * sink)
@@ -276,6 +346,8 @@ gst_rtmp2_sink_start (GstBaseSink * sink)
GST_DEBUG_OBJECT (rtmp2sink, "start");
+ gst_task_start (rtmp2sink->task);
+
return TRUE;
}
@@ -298,6 +370,11 @@ gst_rtmp2_sink_unlock (GstBaseSink * sink)
GST_DEBUG_OBJECT (rtmp2sink, "unlock");
+ rtmp2sink->reset = TRUE;
+ g_mutex_lock (&rtmp2sink->lock);
+ g_cond_signal (&rtmp2sink->cond);
+ g_mutex_unlock (&rtmp2sink->lock);
+
return TRUE;
}
@@ -311,6 +388,11 @@ gst_rtmp2_sink_unlock_stop (GstBaseSink * sink)
GST_DEBUG_OBJECT (rtmp2sink, "unlock_stop");
+ gst_task_stop (rtmp2sink->task);
+ if (rtmp2sink->task_main_loop) {
+ g_main_loop_quit (rtmp2sink->task_main_loop);
+ }
+
return TRUE;
}
@@ -336,66 +418,375 @@ gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event)
return TRUE;
}
-/* wait for eos or gap, subclasses should chain up to parent first */
static GstFlowReturn
-gst_rtmp2_sink_wait_event (GstBaseSink * sink, GstEvent * event)
+gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ GstRtmpChunk *chunk;
+ GBytes *bytes;
+ gsize size;
+ guint8 *data;
- GST_DEBUG_OBJECT (rtmp2sink, "wait_event");
+ GST_DEBUG_OBJECT (rtmp2sink, "render");
+
+ size = gst_buffer_get_size (buffer);
+ gst_buffer_extract_dup (buffer, 0, size, (gpointer *) & data, &size);
+
+ if (size >= 4) {
+ if (data[0] == 'F' && data[1] == 'L' && data[2] == 'V') {
+ /* drop the header, we don't need it */
+ g_free (data);
+ return GST_FLOW_OK;
+ }
+ }
+
+ if (size < 15) {
+ g_free (data);
+ return GST_FLOW_ERROR;
+ }
+
+ chunk = gst_rtmp_chunk_new ();
+ chunk->message_type_id = data[0];
+ if (chunk->message_type_id == 18) {
+ chunk->stream_id = 5;
+ } else if (chunk->message_type_id == 9) {
+ chunk->stream_id = 7;
+ } else {
+ GST_ERROR ("unknown message_type_id %d", chunk->message_type_id);
+ }
+ chunk->message_length = GST_READ_UINT24_BE (data + 1);
+ chunk->timestamp = GST_READ_UINT24_BE (data + 4);
+ chunk->info = 1; /* FIXME use actual stream id */
+
+ if (chunk->message_length != size - 15) {
+ GST_ERROR ("message length was %" G_GSIZE_FORMAT " expected %"
+ G_GSIZE_FORMAT, chunk->message_length, size - 15);
+ }
+
+ bytes = g_bytes_new_take (data, size);
+ chunk->payload = g_bytes_new_from_bytes (bytes, 11, size - 15);
+
+ if (rtmp2sink->dump) {
+ dump_chunk (chunk, TRUE);
+ }
+
+ gst_rtmp_connection_queue_chunk (rtmp2sink->connection, chunk);
return GST_FLOW_OK;
}
-/* notify subclass of buffer or list before doing sync */
-static GstFlowReturn
-gst_rtmp2_sink_prepare (GstBaseSink * sink, GstBuffer * buffer)
+
+/* URL handler */
+
+static GstURIType
+gst_rtmp2_sink_uri_get_type (GType type)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ return GST_URI_SINK;
+}
- GST_DEBUG_OBJECT (rtmp2sink, "prepare");
+static const gchar *const *
+gst_rtmp2_sink_uri_get_protocols (GType type)
+{
+ static const gchar *protocols[] = { "rtmp", NULL };
- return GST_FLOW_OK;
+ return protocols;
}
-static GstFlowReturn
-gst_rtmp2_sink_prepare_list (GstBaseSink * sink, GstBufferList * buffer_list)
+static gchar *
+gst_rtmp2_sink_uri_get_uri (GstURIHandler * handler)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ GstRtmp2Sink *sink = GST_RTMP2_SINK (handler);
- GST_DEBUG_OBJECT (rtmp2sink, "prepare_list");
+ return gst_rtmp2_sink_get_uri (sink);
+}
- return GST_FLOW_OK;
+static gboolean
+gst_rtmp2_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
+ GError ** error)
+{
+ GstRtmp2Sink *sink = GST_RTMP2_SINK (handler);
+ gboolean ret;
+
+ ret = gst_rtmp2_sink_set_uri (sink, uri);
+ if (!ret && error) {
+ *error =
+ g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
+ "Invalid URI");
+ }
+
+ return ret;
}
-/* notify subclass of preroll buffer or real buffer */
-static GstFlowReturn
-gst_rtmp2_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
+/* Internal API */
+
+static gchar *
+gst_rtmp2_sink_get_uri (GstRtmp2Sink * sink)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ if (sink->port == 1935) {
+ return g_strdup_printf ("rtmp://%s/%s/%s", sink->server_address,
+ sink->application, sink->stream);
+ } else {
+ return g_strdup_printf ("rtmp://%s:%d/%s/%s", sink->server_address,
+ sink->port, sink->application, sink->stream);
+ }
+}
+
+/* It would really be awesome if GStreamer had full URI parsing. Alas. */
+/* FIXME this function needs more error checking, and testing */
+static gboolean
+gst_rtmp2_sink_set_uri (GstRtmp2Sink * sink, const char *uri)
+{
+ gchar *location;
+ gchar **parts;
+ gchar **parts2;
+ gboolean ret;
- GST_DEBUG_OBJECT (rtmp2sink, "preroll");
+ GST_DEBUG ("setting uri to %s", uri);
- return GST_FLOW_OK;
+ if (!gst_uri_has_protocol (uri, "rtmp"))
+ return FALSE;
+
+ location = gst_uri_get_location (uri);
+
+ parts = g_strsplit (location, "/", 3);
+ if (parts[0] == NULL || parts[1] == NULL || parts[2] == NULL) {
+ ret = FALSE;
+ goto out;
+ }
+
+ parts2 = g_strsplit (parts[0], ":", 2);
+ if (parts2[1]) {
+ sink->port = g_ascii_strtoull (parts2[1], NULL, 10);
+ } else {
+ sink->port = 1935;
+ }
+ g_free (sink->server_address);
+ sink->server_address = g_strdup (parts2[0]);
+ g_strfreev (parts2);
+
+ g_free (sink->application);
+ sink->application = g_strdup (parts[1]);
+ g_free (sink->stream);
+ sink->stream = g_strdup (parts[2]);
+
+ ret = TRUE;
+
+out:
+ g_free (location);
+ g_strfreev (parts);
+ return ret;
}
-static GstFlowReturn
-gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
+static void
+gst_rtmp2_sink_task (gpointer user_data)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data);
+ GMainLoop *main_loop;
+ GMainContext *main_context;
+
+ GST_ERROR ("gst_rtmp2_sink_task starting");
+
+ gst_rtmp_client_set_server_address (rtmp2sink->client,
+ rtmp2sink->server_address);
+ gst_rtmp_client_connect_async (rtmp2sink->client, NULL, connect_done,
+ rtmp2sink);
+
+ main_context = g_main_context_new ();
+ main_loop = g_main_loop_new (main_context, TRUE);
+ rtmp2sink->task_main_loop = main_loop;
+ g_main_loop_run (main_loop);
+ rtmp2sink->task_main_loop = NULL;
+ g_main_loop_unref (main_loop);
+ g_main_context_unref (main_context);
+
+ GST_ERROR ("gst_rtmp2_sink_task exiting");
+}
- GST_DEBUG_OBJECT (rtmp2sink, "render");
+static void
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
+{
+ GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data);
+ GError *error = NULL;
+ gboolean ret;
+
+ ret = gst_rtmp_client_connect_finish (rtmp2sink->client, result, &error);
+ if (!ret) {
+ GST_ELEMENT_ERROR (rtmp2sink, RESOURCE, OPEN_READ,
+ ("Could not connect to server"), ("%s", error->message));
+ g_error_free (error);
+ return;
+ }
- return GST_FLOW_OK;
+ g_signal_connect (rtmp2sink->connection, "got-chunk", G_CALLBACK (got_chunk),
+ rtmp2sink);
+
+ send_connect (rtmp2sink);
}
-/* Render a BufferList */
-static GstFlowReturn
-gst_rtmp2_sink_render_list (GstBaseSink * sink, GstBufferList * buffer_list)
+static void
+send_connect (GstRtmp2Sink * rtmp2sink)
{
- GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (sink);
+ GstAmfNode *node;
+ gchar *uri;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_OBJECT);
+ gst_amf_object_set_string (node, "app", rtmp2sink->application);
+ gst_amf_object_set_string (node, "type", "nonprivate");
+ uri = gst_rtmp2_sink_get_uri (rtmp2sink);
+ gst_amf_object_set_string (node, "tcUrl", uri);
+ g_free (uri);
+ // "fpad": False,
+ // "capabilities": 15,
+ // "audioCodecs": 3191,
+ // "videoCodecs": 252,
+ // "videoFunction": 1,
+ gst_rtmp_connection_send_command (rtmp2sink->connection, 3, "connect", 1,
+ node, NULL, cmd_connect_done, rtmp2sink);
+}
+
+static void
+cmd_connect_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data)
+{
+ GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data);
+ gboolean ret;
+
+ ret = FALSE;
+ if (optional_args) {
+ const GstAmfNode *n;
+ n = gst_amf_node_get_object (optional_args, "code");
+ if (n) {
+ const char *s;
+ s = gst_amf_node_get_string (n);
+ if (strcmp (s, "NetConnection.Connect.Success") == 0) {
+ ret = TRUE;
+ }
+ }
+ }
- GST_DEBUG_OBJECT (rtmp2sink, "render_list");
+ if (ret) {
+ const GstAmfNode *n;
+
+ GST_DEBUG ("success");
+
+ n = gst_amf_node_get_object (optional_args, "secureToken");
+ if (n) {
+ const gchar *challenge;
+ challenge = gst_amf_node_get_string (n);
+ GST_DEBUG ("secureToken challenge: %s", challenge);
+ send_secure_token_response (rtmp2sink, challenge);
+ }
+
+ send_create_stream (rtmp2sink);
+ } else {
+ GST_ERROR ("connect error");
+ }
+}
+
+static void
+send_create_stream (GstRtmp2Sink * rtmp2sink)
+{
+ GstAmfNode *node;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ gst_rtmp_connection_send_command (rtmp2sink->connection, 3, "createStream", 2,
+ node, NULL, create_stream_done, rtmp2sink);
+
+}
+
+static void
+create_stream_done (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, gpointer user_data)
+{
+ //GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data);
+ gboolean ret;
+ int stream_id;
+
+ ret = FALSE;
+ if (optional_args) {
+ stream_id = gst_amf_node_get_number (optional_args);
+ ret = TRUE;
+ }
+
+ if (ret) {
+ GST_DEBUG ("createStream success, stream_id=%d", stream_id);
+ //send_play (rtmp2sink);
+ } else {
+ GST_ERROR ("createStream failed");
+ }
+}
+
+static void
+got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data)
+{
+ GstRtmp2Sink *rtmp2sink = GST_RTMP2_SINK (user_data);
+
+ if (rtmp2sink->dump) {
+ dump_chunk (chunk, FALSE);
+ }
+}
+
+static void
+dump_command (GstRtmpChunk * chunk)
+{
+ GstAmfNode *amf;
+ gsize size;
+ const guint8 *data;
+ gsize n_parsed;
+ int offset;
+
+ offset = 0;
+ data = g_bytes_get_data (chunk->payload, &size);
+ while (offset < size) {
+ amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ gst_amf_node_dump (amf);
+ gst_amf_node_free (amf);
+ offset += n_parsed;
+ }
+}
+
+static void
+dump_chunk (GstRtmpChunk * chunk, gboolean dir)
+{
+ g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT
+ " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<",
+ chunk->stream_id,
+ chunk->timestamp,
+ chunk->message_length, chunk->message_type_id, chunk->info);
+ if (chunk->message_type_id == 20) {
+ dump_command (chunk);
+ }
+ if (chunk->message_type_id == 18) {
+ dump_command (chunk);
+ }
+ gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));
+}
+
+static void
+send_secure_token_response (GstRtmp2Sink * rtmp2sink, const char *challenge)
+{
+ GstAmfNode *node1;
+ GstAmfNode *node2;
+ gchar *response;
+
+ if (rtmp2sink->secure_token == NULL || !rtmp2sink->secure_token[0]) {
+ GST_ELEMENT_ERROR (rtmp2sink, RESOURCE, OPEN_READ,
+ ("Server requested secureToken authentication"), (NULL));
+ return;
+ }
+
+ response = gst_rtmp_tea_decode (rtmp2sink->secure_token, challenge);
+
+ GST_DEBUG ("response: %s", response);
+
+ node1 = gst_amf_node_new (GST_AMF_TYPE_NULL);
+ node2 = gst_amf_node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_set_string_take (node2, response);
+
+ gst_rtmp_connection_send_command (rtmp2sink->connection, 3,
+ "secureTokenResponse", 0, node1, node2, NULL, NULL);
- return GST_FLOW_OK;
}
diff --git a/plugins/gstrtmp2sink.h b/plugins/gstrtmp2sink.h
index bccb110..eed580d 100644
--- a/plugins/gstrtmp2sink.h
+++ b/plugins/gstrtmp2sink.h
@@ -21,6 +21,8 @@
#define _GST_RTMP2_SINK_H_
#include <gst/base/gstbasesink.h>
+#include <rtmp/rtmpclient.h>
+#include <rtmp/rtmputils.h>
G_BEGIN_DECLS
@@ -37,6 +39,27 @@ struct _GstRtmp2Sink
{
GstBaseSink base_rtmp2sink;
+ /* properties */
+ char *uri;
+ int timeout;
+ char *server_address;
+ int port;
+ char *application;
+ char *stream;
+ char *secure_token;
+
+ /* stuff */
+ GMutex lock;
+ GCond cond;
+ gboolean reset;
+ GstTask *task;
+ GRecMutex task_lock;
+ GMainLoop *task_main_loop;
+
+ GstRtmpClient *client;
+ GstRtmpConnection *connection;
+ gboolean dump;
+
};
struct _GstRtmp2SinkClass
diff --git a/plugins/gstrtmp2src.c b/plugins/gstrtmp2src.c
index 4043b92..2811733 100644
--- a/plugins/gstrtmp2src.c
+++ b/plugins/gstrtmp2src.c
@@ -46,7 +46,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
/* prototypes */
-
+/* GObject virtual functions */
static void gst_rtmp2_src_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_rtmp2_src_get_property (GObject * object,
@@ -56,7 +56,7 @@ static void gst_rtmp2_src_finalize (GObject * object);
static void gst_rtmp2_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
-static void gst_rtmp2_src_task (gpointer user_data);
+/* GstBaseSrc virtual functions */
static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
static void gst_rtmp2_src_get_times (GstBaseSrc * src, GstBuffer * buffer,
@@ -75,6 +75,15 @@ static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
static GstFlowReturn gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buf);
+/* URI handler */
+static GstURIType gst_rtmp2_src_uri_get_type (GType type);
+static const gchar *const *gst_rtmp2_src_uri_get_protocols (GType type);
+static gchar *gst_rtmp2_src_uri_get_uri (GstURIHandler * handler);
+static gboolean gst_rtmp2_src_uri_set_uri (GstURIHandler * handler,
+ const gchar * uri, GError ** error);
+
+/* Internal API */
+static void gst_rtmp2_src_task (gpointer user_data);
static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
static void connect_done (GObject * source, GAsyncResult * result,
@@ -133,16 +142,18 @@ GST_STATIC_PAD_TEMPLATE ("src",
/* class initialization */
+static void
+do_init (GType g_define_type_id)
+{
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtmp2_src_uri_handler_init);
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
+ "debug category for rtmp2src element");
+}
+
G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
- do {
- G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
- gst_rtmp2_src_uri_handler_init);
- GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
- "debug category for rtmp2src element");
- } while (0));
+ do_init (g_define_type_id))
-static void
-gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
+ static void gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass);
@@ -207,6 +218,7 @@ static void
gst_rtmp2_src_init (GstRtmp2Src * rtmp2src)
{
g_mutex_init (&rtmp2src->lock);
+ g_cond_init (&rtmp2src->cond);
rtmp2src->queue = g_queue_new ();
//gst_base_src_set_live (GST_BASE_SRC(rtmp2src), TRUE);
@@ -223,45 +235,6 @@ gst_rtmp2_src_init (GstRtmp2Src * rtmp2src)
}
-static GstURIType
-gst_rtmp2_src_uri_get_type (GType type)
-{
- return GST_URI_SRC;
-}
-
-static const gchar *const *
-gst_rtmp2_src_uri_get_protocols (GType type)
-{
- static const gchar *protocols[] = { "rtmp", NULL };
-
- return protocols;
-}
-
-static gchar *
-gst_rtmp2_src_uri_get_uri (GstURIHandler * handler)
-{
- GstRtmp2Src *src = GST_RTMP2_SRC (handler);
-
- return gst_rtmp2_src_get_uri (src);
-}
-
-static gboolean
-gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
- GError ** error)
-{
- GstRtmp2Src *src = GST_RTMP2_SRC (handler);
- gboolean ret;
-
- ret = gst_rtmp2_src_set_uri (src, uri);
- if (!ret && error) {
- *error =
- g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
- "Invalid URI");
- }
-
- return ret;
-}
-
static void
gst_rtmp2_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
@@ -367,6 +340,7 @@ gst_rtmp2_src_finalize (GObject * object)
g_rec_mutex_clear (&rtmp2src->task_lock);
g_object_unref (rtmp2src->client);
g_mutex_clear (&rtmp2src->lock);
+ g_cond_clear (&rtmp2src->cond);
g_queue_free_full (rtmp2src->queue, g_object_unref);
G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
@@ -821,6 +795,49 @@ gst_rtmp2_src_alloc (GstBaseSrc * src, guint64 offset, guint size,
return GST_FLOW_OK;
}
+
+/* URL handler */
+
+static GstURIType
+gst_rtmp2_src_uri_get_type (GType type)
+{
+ return GST_URI_SRC;
+}
+
+static const gchar *const *
+gst_rtmp2_src_uri_get_protocols (GType type)
+{
+ static const gchar *protocols[] = { "rtmp", NULL };
+
+ return protocols;
+}
+
+static gchar *
+gst_rtmp2_src_uri_get_uri (GstURIHandler * handler)
+{
+ GstRtmp2Src *src = GST_RTMP2_SRC (handler);
+
+ return gst_rtmp2_src_get_uri (src);
+}
+
+static gboolean
+gst_rtmp2_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
+ GError ** error)
+{
+ GstRtmp2Src *src = GST_RTMP2_SRC (handler);
+ gboolean ret;
+
+ ret = gst_rtmp2_src_set_uri (src, uri);
+ if (!ret && error) {
+ *error =
+ g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
+ "Invalid URI");
+ }
+
+ return ret;
+}
+
+
/* internal API */
static gchar *
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index ecec842..ed7c0d1 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -99,6 +99,8 @@ gst_rtmp_client_init (GstRtmpClient * rtmpclient)
{
rtmpclient->server_address = g_strdup (DEFAULT_SERVER_ADDRESS);
rtmpclient->server_port = DEFAULT_SERVER_PORT;
+
+ rtmpclient->connection = gst_rtmp_connection_new ();
}
void
@@ -256,7 +258,8 @@ gst_rtmp_client_connect_done (GObject * source, GAsyncResult * result,
return;
}
- client->connection = gst_rtmp_connection_new (client->socket_connection);
+ gst_rtmp_connection_set_socket_connection (client->connection,
+ client->socket_connection);
gst_rtmp_connection_start_handshake (client->connection, FALSE);
g_simple_async_result_complete (client->async);
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index c4277c5..1529ed4 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -198,13 +198,22 @@ gst_rtmp_connection_finalize (GObject * object)
}
GstRtmpConnection *
-gst_rtmp_connection_new (GSocketConnection * connection)
+gst_rtmp_connection_new (void)
{
GstRtmpConnection *sc;
- GInputStream *is;
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
- sc->connection = connection;
+
+ return sc;
+}
+
+void
+gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
+ GSocketConnection * connection)
+{
+ GInputStream *is;
+
+ sc->connection = g_object_ref (connection);
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
sc->input_source =
@@ -213,8 +222,6 @@ gst_rtmp_connection_new (GSocketConnection * connection)
g_source_set_callback (sc->input_source,
(GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL);
g_source_attach (sc->input_source, NULL);
-
- return sc;
}
static void
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index db26288..6f2a6a2 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -93,7 +93,9 @@ struct _GstRtmpConnectionClass
GType gst_rtmp_connection_get_type (void);
-GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection *connection);
+GstRtmpConnection *gst_rtmp_connection_new (void);
+void gst_rtmp_connection_set_socket_connection (
+ GstRtmpConnection *rtmpconnection, GSocketConnection *connection);
void gst_rtmp_connection_start_handshake (GstRtmpConnection *connection,
gboolean is_server);
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index 7bb2854..41cd93e 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -182,7 +182,8 @@ gst_rtmp_server_incoming (GSocketService * service,
GST_INFO ("client connected");
g_object_ref (socket_connection);
- connection = gst_rtmp_connection_new (socket_connection);
+ connection = gst_rtmp_connection_new ();
+ gst_rtmp_connection_set_socket_connection (connection, socket_connection);
gst_rtmp_server_add_connection (rtmpserver, connection);
gst_rtmp_connection_start_handshake (connection, TRUE);