summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-27 11:49:00 -0700
committerDavid Schleef <ds@schleef.org>2014-08-27 11:49:00 -0700
commit72a3c214c085dda1b8ab65ad7460697498b088dd (patch)
tree45ee2a54c64f088b90d571dc24a2ccd0d84794f8
parentcbf9572bc7d54b43b77f14b28de59f7bd80dd7d5 (diff)
hacking
-rw-r--r--rtmp/amf.c216
-rw-r--r--rtmp/amf.h15
-rw-r--r--rtmp/rtmpchunk.h1
-rw-r--r--rtmp/rtmpconnection.c29
-rw-r--r--rtmp/rtmpconnection.h9
-rw-r--r--tools/client-test.c77
-rw-r--r--tools/proxy-server.c3
7 files changed, 332 insertions, 18 deletions
diff --git a/rtmp/amf.c b/rtmp/amf.c
index c295d1a..a510e67 100644
--- a/rtmp/amf.c
+++ b/rtmp/amf.c
@@ -21,6 +21,7 @@
#include "config.h"
#endif
+#include <string.h>
#include <gst/gst.h>
#include "amf.h"
@@ -41,10 +42,21 @@ struct _AmfParser
gboolean error;
};
+typedef struct _AmfSerializer AmfSerializer;
+struct _AmfSerializer
+{
+ guint8 *data;
+ gsize size;
+ int offset;
+ gboolean error;
+};
+
static char *_parse_utf8_string (AmfParser * parser);
static void _parse_object (AmfParser * parser, GstAmfNode * node);
static GstAmfNode *_parse_value (AmfParser * parser);
static void amf_object_field_free (AmfObjectField * field);
+static void _serialize_object (AmfSerializer * serializer, GstAmfNode * node);
+static void _serialize_value (AmfSerializer * serializer, GstAmfNode * node);
GstAmfNode *
@@ -127,8 +139,8 @@ _parse_array (AmfParser * parser, int size)
}
#endif
-static int
-_parse_double (AmfParser * parser)
+static double
+_parse_number (AmfParser * parser)
{
double d;
int i;
@@ -214,7 +226,7 @@ _parse_value (AmfParser * parser)
switch (type) {
case GST_AMF_TYPE_NUMBER:
- gst_amf_node_set_double (node, _parse_double (parser));
+ gst_amf_node_set_number (node, _parse_number (parser));
break;
case GST_AMF_TYPE_BOOLEAN:
gst_amf_node_set_boolean (node, _parse_u8 (parser));
@@ -266,7 +278,7 @@ gst_amf_node_set_boolean (GstAmfNode * node, gboolean val)
}
void
-gst_amf_node_set_double (GstAmfNode * node, double val)
+gst_amf_node_set_number (GstAmfNode * node, double val)
{
g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER);
node->double_val = val;
@@ -287,7 +299,8 @@ gst_amf_node_set_string_take (GstAmfNode * node, char *s)
}
void
-gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node)
+gst_amf_object_append_take (GstAmfNode * node, const char *s,
+ GstAmfNode * child_node)
{
AmfObjectField *field;
@@ -295,7 +308,7 @@ gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node)
node->type == GST_AMF_TYPE_ECMA_ARRAY);
field = g_malloc0 (sizeof (AmfObjectField));
- field->name = s;
+ field->name = g_strdup (s);
field->value = child_node;
g_ptr_array_add (node->array_val, field);
}
@@ -315,6 +328,29 @@ gst_amf_node_set_ecma_array (GstAmfNode * node, guint8 * data, int size)
node->int_val = size;
}
+void
+gst_amf_object_set_number (GstAmfNode * node, const char *field_name,
+ double val)
+{
+ GstAmfNode *child_node;
+
+ child_node = gst_amf_node_new (GST_AMF_TYPE_NUMBER);
+ gst_amf_node_set_number (child_node, val);
+ gst_amf_object_append_take (node, field_name, child_node);
+}
+
+void
+gst_amf_object_set_string (GstAmfNode * node, const char *field_name,
+ const char *s)
+{
+ GstAmfNode *child_node;
+
+ child_node = gst_amf_node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_set_string (child_node, s);
+ gst_amf_object_append_take (node, field_name, child_node);
+}
+
+
static void
_gst_amf_node_dump (GstAmfNode * node, int indent)
{
@@ -361,3 +397,171 @@ gst_amf_node_dump (GstAmfNode * node)
_gst_amf_node_dump (node, 0);
g_print ("\n");
}
+
+static gboolean
+_serialize_check (AmfSerializer * serializer, int value)
+{
+ if (serializer->offset + value > serializer->size) {
+ serializer->error = TRUE;
+ }
+ return !serializer->error;
+}
+
+static void
+_serialize_u8 (AmfSerializer * serializer, int value)
+{
+ if (_serialize_check (serializer, 1)) {
+ serializer->data[serializer->offset] = value;
+ serializer->offset++;
+ }
+}
+
+static void
+_serialize_u16 (AmfSerializer * serializer, int value)
+{
+ if (_serialize_check (serializer, 2)) {
+ serializer->data[serializer->offset] = (value >> 8) & 0xff;
+ serializer->data[serializer->offset + 1] = value & 0xff;
+ serializer->offset += 2;
+ }
+}
+
+#if 0
+static void
+_serialize_u24 (AmfSerializer * serializer, int value)
+{
+ if (_serialize_check (serializer, 3)) {
+ serializer->data[serializer->offset] = (value >> 16) & 0xff;
+ serializer->data[serializer->offset + 1] = (value >> 8) & 0xff;
+ serializer->data[serializer->offset + 2] = value & 0xff;
+ serializer->offset += 3;
+ }
+}
+#endif
+
+static void
+_serialize_u32 (AmfSerializer * serializer, int value)
+{
+ if (_serialize_check (serializer, 4)) {
+ serializer->data[serializer->offset] = (value >> 24) & 0xff;
+ serializer->data[serializer->offset + 1] = (value >> 16) & 0xff;
+ serializer->data[serializer->offset + 2] = (value >> 8) & 0xff;
+ serializer->data[serializer->offset + 3] = value & 0xff;
+ serializer->offset += 4;
+ }
+}
+
+static void
+_serialize_number (AmfSerializer * serializer, double value)
+{
+ if (_serialize_check (serializer, 8)) {
+ guint8 *d_ptr = (guint8 *) & value;
+ int i;
+
+ for (i = 0; i < 8; i++) {
+ serializer->data[serializer->offset + i] = d_ptr[7 - i];
+ }
+ serializer->offset += 8;
+ }
+}
+
+static void
+_serialize_utf8_string (AmfSerializer * serializer, const char *s)
+{
+ int size;
+
+ size = strlen (s);
+ if (_serialize_check (serializer, 2 + size)) {
+ serializer->data[serializer->offset] = (size >> 8) & 0xff;
+ serializer->data[serializer->offset + 1] = size & 0xff;
+ memcpy (serializer->data + serializer->offset + 2, s, size);
+ serializer->offset += 2 + size;
+ }
+}
+
+static void
+_serialize_object (AmfSerializer * serializer, GstAmfNode * node)
+{
+ int i;
+
+ for (i = 0; i < node->array_val->len; i++) {
+ AmfObjectField *field = g_ptr_array_index (node->array_val, i);
+ _serialize_utf8_string (serializer, field->name);
+ _serialize_value (serializer, field->value);
+ }
+ _serialize_u16 (serializer, 0);
+ _serialize_u8 (serializer, GST_AMF_TYPE_OBJECT_END);
+}
+
+static void
+_serialize_ecma_array (AmfSerializer * serializer, GstAmfNode * node)
+{
+ int i;
+
+ _serialize_u32 (serializer, 0);
+ for (i = 0; i < node->array_val->len; i++) {
+ AmfObjectField *field = g_ptr_array_index (node->array_val, i);
+ _serialize_utf8_string (serializer, field->name);
+ _serialize_value (serializer, field->value);
+ }
+ _serialize_u16 (serializer, 0);
+ _serialize_u8 (serializer, GST_AMF_TYPE_OBJECT_END);
+}
+
+static void
+_serialize_value (AmfSerializer * serializer, GstAmfNode * node)
+{
+ _serialize_u8 (serializer, node->type);
+ switch (node->type) {
+ case GST_AMF_TYPE_NUMBER:
+ _serialize_number (serializer, node->double_val);
+ break;
+ case GST_AMF_TYPE_BOOLEAN:
+ _serialize_u8 (serializer, ! !node->int_val);
+ break;
+ case GST_AMF_TYPE_STRING:
+ _serialize_utf8_string (serializer, node->string_val);
+ break;
+ case GST_AMF_TYPE_OBJECT:
+ _serialize_object (serializer, node);
+ break;
+ case GST_AMF_TYPE_MOVIECLIP:
+ GST_ERROR ("unimplemented AMF type: movie clip");
+ serializer->error = TRUE;
+ break;
+ case GST_AMF_TYPE_NULL:
+ break;
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ _serialize_ecma_array (serializer, node);
+ break;
+ case GST_AMF_TYPE_OBJECT_END:
+ break;
+ default:
+ GST_ERROR ("unimplemented AMF type %d", node->type);
+ serializer->error = TRUE;
+ break;
+ }
+}
+
+GBytes *
+gst_amf_serialize_command (const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args)
+{
+ AmfSerializer _s = { 0 }, *serializer = &_s;
+
+ serializer->size = 4096;
+ serializer->data = g_malloc (serializer->size);
+
+ _serialize_utf8_string (serializer, command_name);
+ _serialize_number (serializer, transaction_id);
+ _serialize_object (serializer, command_object);
+ if (optional_args)
+ _serialize_object (serializer, optional_args);
+
+ if (serializer->error) {
+ GST_ERROR ("failed to serialize");
+ g_free (serializer->data);
+ return NULL;
+ }
+ return g_bytes_new_take (serializer->data, serializer->offset);
+}
diff --git a/rtmp/amf.h b/rtmp/amf.h
index c782e06..0ace7d3 100644
--- a/rtmp/amf.h
+++ b/rtmp/amf.h
@@ -59,15 +59,24 @@ GstAmfNode * gst_amf_node_new (GstAmfType type);
void gst_amf_node_free (GstAmfNode *node);
void gst_amf_node_dump (GstAmfNode *node);
-GstAmfNode * gst_amf_node_new_parse (const guint8 *data, gsize size, gsize *n_bytes);
+GstAmfNode * gst_amf_node_new_parse (const guint8 *data, gsize size,
+ gsize *n_bytes);
void gst_amf_node_set_boolean (GstAmfNode *node, gboolean val);
-void gst_amf_node_set_double (GstAmfNode *node, double val);
+void gst_amf_node_set_number (GstAmfNode *node, double val);
void gst_amf_node_set_string (GstAmfNode *node, const char *s);
void gst_amf_node_set_string_take (GstAmfNode *node, char *s);
void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size);
-void gst_amf_object_append_take (GstAmfNode *node, char *s, GstAmfNode *child_node);
+void gst_amf_object_append_take (GstAmfNode *node, const char *s,
+ GstAmfNode *child_node);
+void gst_amf_object_set_number (GstAmfNode *node, const char *field_name,
+ double val);
+void gst_amf_object_set_string (GstAmfNode *node, const char *field_name,
+ const char *s);
+
+GBytes * gst_amf_serialize_command (const char *command_name,
+ int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args);
G_END_DECLS
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 3660f02..2fa75ca 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -64,7 +64,6 @@ struct _GstRtmpChunk
guint32 info;
GBytes *payload;
- gpointer priv;
};
struct _GstRtmpChunkClass
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index f217f22..5ac713a 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -530,13 +530,16 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
GST_DEBUG ("got protocol control message, type: %d",
entry->chunk->message_type_id);
gst_rtmp_connection_handle_pcm (sc, entry->chunk);
- g_object_unref (entry->chunk);
+ g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk);
+ } else if (entry->chunk->message_type_id == 0x14) {
+ /* FIXME parse command */
+ g_signal_emit_by_name (sc, "got-command", entry->chunk);
} else {
GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes",
entry->chunk->message_length);
g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
- g_object_unref (entry->chunk);
}
+ g_object_unref (entry->chunk);
entry->chunk = NULL;
entry->offset = 0;
@@ -597,7 +600,6 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
- chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
gst_rtmp_connection_start_output (connection);
}
@@ -764,3 +766,24 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection)
g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes);
}
+
+int
+gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args)
+{
+ GstRtmpChunk *chunk;
+
+ chunk = gst_rtmp_chunk_new ();
+ chunk->stream_id = stream_id;
+ chunk->timestamp = 0; /* FIXME */
+ chunk->message_type_id = 0x14;
+ chunk->info = 0; /* FIXME */
+
+ chunk->payload = gst_amf_serialize_command (command_name, transaction_id,
+ command_object, optional_args);
+
+ gst_rtmp_connection_queue_chunk (connection, chunk);
+
+ return transaction_id;
+}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index c00ddee..aa2b718 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -22,6 +22,7 @@
#include <gio/gio.h>
#include <rtmp/rtmpchunk.h>
+#include <rtmp/amf.h>
G_BEGIN_DECLS
@@ -76,6 +77,9 @@ struct _GstRtmpConnectionClass
/* signals */
void (*got_chunk) (GstRtmpConnection *connection,
GstRtmpChunk *chunk);
+ void (*got_command) (GstRtmpConnection *connection,
+ const char *command_name, int transaction_id,
+ GstAmfNode *command_object, GstAmfNode *option_args);
};
GType gst_rtmp_connection_get_type (void);
@@ -88,6 +92,11 @@ void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection,
GstRtmpChunk *chunk);
void gst_rtmp_connection_dump (GstRtmpConnection *connection);
+int gst_rtmp_connection_send_command (GstRtmpConnection *connection,
+ int stream_id, const char *command_name, int transaction_id,
+ GstAmfNode *command_object, GstAmfNode *optional_args);
+
+
G_END_DECLS
#endif
diff --git a/tools/client-test.c b/tools/client-test.c
index 1d8b0ec..7a9efbe 100644
--- a/tools/client-test.c
+++ b/tools/client-test.c
@@ -25,18 +25,28 @@
#include <gio/gio.h>
#include <stdlib.h>
#include "rtmpclient.h"
+#include "rtmputils.h"
+
#define GETTEXT_PACKAGE NULL
+GstRtmpClient *client;
+GstRtmpConnection *connection;
+
gboolean verbose;
+gboolean dump;
static GOptionEntry entries[] = {
{"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL},
+ {"dump", 'd', 0, G_OPTION_ARG_NONE, &dump, "Dump packets", NULL},
{NULL}
};
-static void
-connect_done (GObject * source, GAsyncResult * result, gpointer user_data);
+static void connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void send_connect (void);
+static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data);
int
main (int argc, char *argv[])
@@ -44,7 +54,6 @@ main (int argc, char *argv[])
GError *error = NULL;
GOptionContext *context;
GMainLoop *main_loop;
- GstRtmpClient *client;
GCancellable *cancellable;
context = g_option_context_new ("- FIXME");
@@ -58,6 +67,8 @@ main (int argc, char *argv[])
client = gst_rtmp_client_new ();
+ gst_rtmp_client_set_server_address (client,
+ "ec2-54-189-67-158.us-west-2.compute.amazonaws.com");
cancellable = g_cancellable_new ();
main_loop = g_main_loop_new (NULL, TRUE);
@@ -83,4 +94,64 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
}
GST_ERROR ("got here");
+
+ connection = gst_rtmp_client_get_connection (client);
+ g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
+
+ send_connect ();
+}
+
+static void
+send_connect (void)
+{
+ GstAmfNode *node;
+
+ node = gst_amf_node_new (GST_AMF_TYPE_OBJECT);
+ gst_amf_object_set_string (node, "app", "live");
+ gst_amf_object_set_string (node, "type", "nonprivate");
+ gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live");
+ gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL);
+}
+
+static void
+dump_command (GstRtmpChunk * chunk)
+{
+ GstAmfNode *amf;
+ gsize size;
+ const guint8 *data;
+ gsize n_parsed;
+ int offset;
+
+ offset = 0;
+ data = g_bytes_get_data (chunk->payload, &size);
+ while (offset < size) {
+ amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ gst_amf_node_dump (amf);
+ gst_amf_node_free (amf);
+ offset += n_parsed;
+ }
+}
+
+static void
+dump_chunk (GstRtmpChunk * chunk, gboolean dir)
+{
+ if (!dump)
+ return;
+
+ g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT
+ " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<",
+ chunk->stream_id,
+ chunk->timestamp,
+ chunk->message_length, chunk->message_type_id, chunk->info);
+ if (chunk->stream_id == 3 && chunk->message_type_id == 20) {
+ dump_command (chunk);
+ }
+ gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));
+}
+
+static void
+got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data)
+{
+ dump_chunk (chunk, TRUE);
}
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 6a77377..0ab122a 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -219,8 +219,7 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir)
chunk->stream_id,
chunk->timestamp,
chunk->message_length, chunk->message_type_id, chunk->info);
- if (chunk->stream_id == 3 &&
- chunk->message_type_id == 20) {
+ if (chunk->stream_id == 3 && chunk->message_type_id == 20) {
dump_command (chunk);
}
gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk));