diff options
author | David Schleef <ds@schleef.org> | 2014-08-27 11:49:00 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-27 11:49:00 -0700 |
commit | 72a3c214c085dda1b8ab65ad7460697498b088dd (patch) | |
tree | 45ee2a54c64f088b90d571dc24a2ccd0d84794f8 | |
parent | cbf9572bc7d54b43b77f14b28de59f7bd80dd7d5 (diff) |
hacking
-rw-r--r-- | rtmp/amf.c | 216 | ||||
-rw-r--r-- | rtmp/amf.h | 15 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 1 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 29 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 9 | ||||
-rw-r--r-- | tools/client-test.c | 77 | ||||
-rw-r--r-- | tools/proxy-server.c | 3 |
7 files changed, 332 insertions, 18 deletions
@@ -21,6 +21,7 @@ #include "config.h" #endif +#include <string.h> #include <gst/gst.h> #include "amf.h" @@ -41,10 +42,21 @@ struct _AmfParser gboolean error; }; +typedef struct _AmfSerializer AmfSerializer; +struct _AmfSerializer +{ + guint8 *data; + gsize size; + int offset; + gboolean error; +}; + static char *_parse_utf8_string (AmfParser * parser); static void _parse_object (AmfParser * parser, GstAmfNode * node); static GstAmfNode *_parse_value (AmfParser * parser); static void amf_object_field_free (AmfObjectField * field); +static void _serialize_object (AmfSerializer * serializer, GstAmfNode * node); +static void _serialize_value (AmfSerializer * serializer, GstAmfNode * node); GstAmfNode * @@ -127,8 +139,8 @@ _parse_array (AmfParser * parser, int size) } #endif -static int -_parse_double (AmfParser * parser) +static double +_parse_number (AmfParser * parser) { double d; int i; @@ -214,7 +226,7 @@ _parse_value (AmfParser * parser) switch (type) { case GST_AMF_TYPE_NUMBER: - gst_amf_node_set_double (node, _parse_double (parser)); + gst_amf_node_set_number (node, _parse_number (parser)); break; case GST_AMF_TYPE_BOOLEAN: gst_amf_node_set_boolean (node, _parse_u8 (parser)); @@ -266,7 +278,7 @@ gst_amf_node_set_boolean (GstAmfNode * node, gboolean val) } void -gst_amf_node_set_double (GstAmfNode * node, double val) +gst_amf_node_set_number (GstAmfNode * node, double val) { g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER); node->double_val = val; @@ -287,7 +299,8 @@ gst_amf_node_set_string_take (GstAmfNode * node, char *s) } void -gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node) +gst_amf_object_append_take (GstAmfNode * node, const char *s, + GstAmfNode * child_node) { AmfObjectField *field; @@ -295,7 +308,7 @@ gst_amf_object_append_take (GstAmfNode * node, char *s, GstAmfNode * child_node) node->type == GST_AMF_TYPE_ECMA_ARRAY); field = g_malloc0 (sizeof (AmfObjectField)); - field->name = s; + field->name = g_strdup (s); field->value = child_node; g_ptr_array_add (node->array_val, field); } @@ -315,6 +328,29 @@ gst_amf_node_set_ecma_array (GstAmfNode * node, guint8 * data, int size) node->int_val = size; } +void +gst_amf_object_set_number (GstAmfNode * node, const char *field_name, + double val) +{ + GstAmfNode *child_node; + + child_node = gst_amf_node_new (GST_AMF_TYPE_NUMBER); + gst_amf_node_set_number (child_node, val); + gst_amf_object_append_take (node, field_name, child_node); +} + +void +gst_amf_object_set_string (GstAmfNode * node, const char *field_name, + const char *s) +{ + GstAmfNode *child_node; + + child_node = gst_amf_node_new (GST_AMF_TYPE_STRING); + gst_amf_node_set_string (child_node, s); + gst_amf_object_append_take (node, field_name, child_node); +} + + static void _gst_amf_node_dump (GstAmfNode * node, int indent) { @@ -361,3 +397,171 @@ gst_amf_node_dump (GstAmfNode * node) _gst_amf_node_dump (node, 0); g_print ("\n"); } + +static gboolean +_serialize_check (AmfSerializer * serializer, int value) +{ + if (serializer->offset + value > serializer->size) { + serializer->error = TRUE; + } + return !serializer->error; +} + +static void +_serialize_u8 (AmfSerializer * serializer, int value) +{ + if (_serialize_check (serializer, 1)) { + serializer->data[serializer->offset] = value; + serializer->offset++; + } +} + +static void +_serialize_u16 (AmfSerializer * serializer, int value) +{ + if (_serialize_check (serializer, 2)) { + serializer->data[serializer->offset] = (value >> 8) & 0xff; + serializer->data[serializer->offset + 1] = value & 0xff; + serializer->offset += 2; + } +} + +#if 0 +static void +_serialize_u24 (AmfSerializer * serializer, int value) +{ + if (_serialize_check (serializer, 3)) { + serializer->data[serializer->offset] = (value >> 16) & 0xff; + serializer->data[serializer->offset + 1] = (value >> 8) & 0xff; + serializer->data[serializer->offset + 2] = value & 0xff; + serializer->offset += 3; + } +} +#endif + +static void +_serialize_u32 (AmfSerializer * serializer, int value) +{ + if (_serialize_check (serializer, 4)) { + serializer->data[serializer->offset] = (value >> 24) & 0xff; + serializer->data[serializer->offset + 1] = (value >> 16) & 0xff; + serializer->data[serializer->offset + 2] = (value >> 8) & 0xff; + serializer->data[serializer->offset + 3] = value & 0xff; + serializer->offset += 4; + } +} + +static void +_serialize_number (AmfSerializer * serializer, double value) +{ + if (_serialize_check (serializer, 8)) { + guint8 *d_ptr = (guint8 *) & value; + int i; + + for (i = 0; i < 8; i++) { + serializer->data[serializer->offset + i] = d_ptr[7 - i]; + } + serializer->offset += 8; + } +} + +static void +_serialize_utf8_string (AmfSerializer * serializer, const char *s) +{ + int size; + + size = strlen (s); + if (_serialize_check (serializer, 2 + size)) { + serializer->data[serializer->offset] = (size >> 8) & 0xff; + serializer->data[serializer->offset + 1] = size & 0xff; + memcpy (serializer->data + serializer->offset + 2, s, size); + serializer->offset += 2 + size; + } +} + +static void +_serialize_object (AmfSerializer * serializer, GstAmfNode * node) +{ + int i; + + for (i = 0; i < node->array_val->len; i++) { + AmfObjectField *field = g_ptr_array_index (node->array_val, i); + _serialize_utf8_string (serializer, field->name); + _serialize_value (serializer, field->value); + } + _serialize_u16 (serializer, 0); + _serialize_u8 (serializer, GST_AMF_TYPE_OBJECT_END); +} + +static void +_serialize_ecma_array (AmfSerializer * serializer, GstAmfNode * node) +{ + int i; + + _serialize_u32 (serializer, 0); + for (i = 0; i < node->array_val->len; i++) { + AmfObjectField *field = g_ptr_array_index (node->array_val, i); + _serialize_utf8_string (serializer, field->name); + _serialize_value (serializer, field->value); + } + _serialize_u16 (serializer, 0); + _serialize_u8 (serializer, GST_AMF_TYPE_OBJECT_END); +} + +static void +_serialize_value (AmfSerializer * serializer, GstAmfNode * node) +{ + _serialize_u8 (serializer, node->type); + switch (node->type) { + case GST_AMF_TYPE_NUMBER: + _serialize_number (serializer, node->double_val); + break; + case GST_AMF_TYPE_BOOLEAN: + _serialize_u8 (serializer, ! !node->int_val); + break; + case GST_AMF_TYPE_STRING: + _serialize_utf8_string (serializer, node->string_val); + break; + case GST_AMF_TYPE_OBJECT: + _serialize_object (serializer, node); + break; + case GST_AMF_TYPE_MOVIECLIP: + GST_ERROR ("unimplemented AMF type: movie clip"); + serializer->error = TRUE; + break; + case GST_AMF_TYPE_NULL: + break; + case GST_AMF_TYPE_ECMA_ARRAY: + _serialize_ecma_array (serializer, node); + break; + case GST_AMF_TYPE_OBJECT_END: + break; + default: + GST_ERROR ("unimplemented AMF type %d", node->type); + serializer->error = TRUE; + break; + } +} + +GBytes * +gst_amf_serialize_command (const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args) +{ + AmfSerializer _s = { 0 }, *serializer = &_s; + + serializer->size = 4096; + serializer->data = g_malloc (serializer->size); + + _serialize_utf8_string (serializer, command_name); + _serialize_number (serializer, transaction_id); + _serialize_object (serializer, command_object); + if (optional_args) + _serialize_object (serializer, optional_args); + + if (serializer->error) { + GST_ERROR ("failed to serialize"); + g_free (serializer->data); + return NULL; + } + return g_bytes_new_take (serializer->data, serializer->offset); +} @@ -59,15 +59,24 @@ GstAmfNode * gst_amf_node_new (GstAmfType type); void gst_amf_node_free (GstAmfNode *node); void gst_amf_node_dump (GstAmfNode *node); -GstAmfNode * gst_amf_node_new_parse (const guint8 *data, gsize size, gsize *n_bytes); +GstAmfNode * gst_amf_node_new_parse (const guint8 *data, gsize size, + gsize *n_bytes); void gst_amf_node_set_boolean (GstAmfNode *node, gboolean val); -void gst_amf_node_set_double (GstAmfNode *node, double val); +void gst_amf_node_set_number (GstAmfNode *node, double val); void gst_amf_node_set_string (GstAmfNode *node, const char *s); void gst_amf_node_set_string_take (GstAmfNode *node, char *s); void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size); -void gst_amf_object_append_take (GstAmfNode *node, char *s, GstAmfNode *child_node); +void gst_amf_object_append_take (GstAmfNode *node, const char *s, + GstAmfNode *child_node); +void gst_amf_object_set_number (GstAmfNode *node, const char *field_name, + double val); +void gst_amf_object_set_string (GstAmfNode *node, const char *field_name, + const char *s); + +GBytes * gst_amf_serialize_command (const char *command_name, + int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args); G_END_DECLS diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 3660f02..2fa75ca 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -64,7 +64,6 @@ struct _GstRtmpChunk guint32 info; GBytes *payload; - gpointer priv; }; struct _GstRtmpChunkClass diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index f217f22..5ac713a 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -530,13 +530,16 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) GST_DEBUG ("got protocol control message, type: %d", entry->chunk->message_type_id); gst_rtmp_connection_handle_pcm (sc, entry->chunk); - g_object_unref (entry->chunk); + g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk); + } else if (entry->chunk->message_type_id == 0x14) { + /* FIXME parse command */ + g_signal_emit_by_name (sc, "got-command", entry->chunk); } else { GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", entry->chunk->message_length); g_signal_emit_by_name (sc, "got-chunk", entry->chunk); - g_object_unref (entry->chunk); } + g_object_unref (entry->chunk); entry->chunk = NULL; entry->offset = 0; @@ -597,7 +600,6 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); - chunk->priv = connection; g_queue_push_tail (connection->output_queue, chunk); gst_rtmp_connection_start_output (connection); } @@ -764,3 +766,24 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection) g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes); } + +int +gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args) +{ + GstRtmpChunk *chunk; + + chunk = gst_rtmp_chunk_new (); + chunk->stream_id = stream_id; + chunk->timestamp = 0; /* FIXME */ + chunk->message_type_id = 0x14; + chunk->info = 0; /* FIXME */ + + chunk->payload = gst_amf_serialize_command (command_name, transaction_id, + command_object, optional_args); + + gst_rtmp_connection_queue_chunk (connection, chunk); + + return transaction_id; +} diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index c00ddee..aa2b718 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -22,6 +22,7 @@ #include <gio/gio.h> #include <rtmp/rtmpchunk.h> +#include <rtmp/amf.h> G_BEGIN_DECLS @@ -76,6 +77,9 @@ struct _GstRtmpConnectionClass /* signals */ void (*got_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk); + void (*got_command) (GstRtmpConnection *connection, + const char *command_name, int transaction_id, + GstAmfNode *command_object, GstAmfNode *option_args); }; GType gst_rtmp_connection_get_type (void); @@ -88,6 +92,11 @@ void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection, GstRtmpChunk *chunk); void gst_rtmp_connection_dump (GstRtmpConnection *connection); +int gst_rtmp_connection_send_command (GstRtmpConnection *connection, + int stream_id, const char *command_name, int transaction_id, + GstAmfNode *command_object, GstAmfNode *optional_args); + + G_END_DECLS #endif diff --git a/tools/client-test.c b/tools/client-test.c index 1d8b0ec..7a9efbe 100644 --- a/tools/client-test.c +++ b/tools/client-test.c @@ -25,18 +25,28 @@ #include <gio/gio.h> #include <stdlib.h> #include "rtmpclient.h" +#include "rtmputils.h" + #define GETTEXT_PACKAGE NULL +GstRtmpClient *client; +GstRtmpConnection *connection; + gboolean verbose; +gboolean dump; static GOptionEntry entries[] = { {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL}, + {"dump", 'd', 0, G_OPTION_ARG_NONE, &dump, "Dump packets", NULL}, {NULL} }; -static void -connect_done (GObject * source, GAsyncResult * result, gpointer user_data); +static void connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void send_connect (void); +static void got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data); int main (int argc, char *argv[]) @@ -44,7 +54,6 @@ main (int argc, char *argv[]) GError *error = NULL; GOptionContext *context; GMainLoop *main_loop; - GstRtmpClient *client; GCancellable *cancellable; context = g_option_context_new ("- FIXME"); @@ -58,6 +67,8 @@ main (int argc, char *argv[]) client = gst_rtmp_client_new (); + gst_rtmp_client_set_server_address (client, + "ec2-54-189-67-158.us-west-2.compute.amazonaws.com"); cancellable = g_cancellable_new (); main_loop = g_main_loop_new (NULL, TRUE); @@ -83,4 +94,64 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data) } GST_ERROR ("got here"); + + connection = gst_rtmp_client_get_connection (client); + g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); + + send_connect (); +} + +static void +send_connect (void) +{ + GstAmfNode *node; + + node = gst_amf_node_new (GST_AMF_TYPE_OBJECT); + gst_amf_object_set_string (node, "app", "live"); + gst_amf_object_set_string (node, "type", "nonprivate"); + gst_amf_object_set_string (node, "tcUrl", "rtmp://localhost:1935/live"); + gst_rtmp_connection_send_command (connection, 3, "connect", 1, node, NULL); +} + +static void +dump_command (GstRtmpChunk * chunk) +{ + GstAmfNode *amf; + gsize size; + const guint8 *data; + gsize n_parsed; + int offset; + + offset = 0; + data = g_bytes_get_data (chunk->payload, &size); + while (offset < size) { + amf = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + gst_amf_node_dump (amf); + gst_amf_node_free (amf); + offset += n_parsed; + } +} + +static void +dump_chunk (GstRtmpChunk * chunk, gboolean dir) +{ + if (!dump) + return; + + g_print ("%s stream_id:%-4d ts:%-8d len:%-6" G_GSIZE_FORMAT + " type_id:%-4d info:%08x\n", dir ? ">>>" : "<<<", + chunk->stream_id, + chunk->timestamp, + chunk->message_length, chunk->message_type_id, chunk->info); + if (chunk->stream_id == 3 && chunk->message_type_id == 20) { + dump_command (chunk); + } + gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); +} + +static void +got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data) +{ + dump_chunk (chunk, TRUE); } diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 6a77377..0ab122a 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -219,8 +219,7 @@ dump_chunk (GstRtmpChunk * chunk, gboolean dir) chunk->stream_id, chunk->timestamp, chunk->message_length, chunk->message_type_id, chunk->info); - if (chunk->stream_id == 3 && - chunk->message_type_id == 20) { + if (chunk->stream_id == 3 && chunk->message_type_id == 20) { dump_command (chunk); } gst_rtmp_dump_data (gst_rtmp_chunk_get_payload (chunk)); |