diff options
author | David Schleef <ds@schleef.org> | 2014-08-30 13:17:13 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-30 13:17:13 -0700 |
commit | cff0d57190f3f0b9c9d0c30012b959f12055da08 (patch) | |
tree | 2eadf183af3c3d6b1062eebd3f06053be16e0378 /rtmp | |
parent | bf029aebe192f30de5d79281d0b8c44f94cfd233 (diff) |
hacking
Diffstat (limited to 'rtmp')
-rw-r--r-- | rtmp/amf.c | 81 | ||||
-rw-r--r-- | rtmp/amf.h | 10 | ||||
-rw-r--r-- | rtmp/rtmpchunk.c | 63 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 5 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 1 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 144 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 21 |
7 files changed, 293 insertions, 32 deletions
@@ -552,11 +552,13 @@ gst_amf_serialize_command (const char *command_name, int transaction_id, serializer->size = 4096; serializer->data = g_malloc (serializer->size); + _serialize_u8 (serializer, GST_AMF_TYPE_STRING); _serialize_utf8_string (serializer, command_name); + _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER); _serialize_number (serializer, transaction_id); - _serialize_object (serializer, command_object); + _serialize_value (serializer, command_object); if (optional_args) - _serialize_object (serializer, optional_args); + _serialize_value (serializer, optional_args); if (serializer->error) { GST_ERROR ("failed to serialize"); @@ -565,3 +567,78 @@ gst_amf_serialize_command (const char *command_name, int transaction_id, } return g_bytes_new_take (serializer->data, serializer->offset); } + +GBytes * +gst_amf_serialize_command2 (const char *command_name, int transaction_id, + GstAmfNode * command_object, GstAmfNode * optional_args, GstAmfNode * n3, + GstAmfNode * n4) +{ + AmfSerializer _s = { 0 }, *serializer = &_s; + + serializer->size = 4096; + serializer->data = g_malloc (serializer->size); + + _serialize_u8 (serializer, GST_AMF_TYPE_STRING); + _serialize_utf8_string (serializer, command_name); + _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER); + _serialize_number (serializer, transaction_id); + _serialize_value (serializer, command_object); + if (optional_args) + _serialize_value (serializer, optional_args); + if (n3) + _serialize_value (serializer, n3); + if (n4) + _serialize_value (serializer, n4); + + if (serializer->error) { + GST_ERROR ("failed to serialize"); + g_free (serializer->data); + return NULL; + } + return g_bytes_new_take (serializer->data, serializer->offset); +} + +gboolean +gst_amf_node_get_boolean (const GstAmfNode * node) +{ + return node->int_val; +} + +const char * +gst_amf_node_get_string (const GstAmfNode * node) +{ + return node->string_val; +} + +double +gst_amf_node_get_number (const GstAmfNode * node) +{ + return node->double_val; +} + +const GstAmfNode * +gst_amf_node_get_object (const GstAmfNode * node, const char *field_name) +{ + int i; + for (i = 0; i < node->array_val->len; i++) { + AmfObjectField *field = g_ptr_array_index (node->array_val, i); + if (strcmp (field->name, field_name) == 0) { + return field->value; + } + } + return NULL; +} + +int +gst_amf_node_get_object_length (const GstAmfNode * node) +{ + return node->array_val->len; +} + +const GstAmfNode * +gst_amf_node_get_object_by_index (const GstAmfNode * node, int i) +{ + AmfObjectField *field; + field = g_ptr_array_index (node->array_val, i); + return field->value; +} @@ -70,6 +70,13 @@ void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size); void gst_amf_object_append_take (GstAmfNode *node, const char *s, GstAmfNode *child_node); +gboolean gst_amf_node_get_boolean (const GstAmfNode *node); +const char *gst_amf_node_get_string (const GstAmfNode *node); +double gst_amf_node_get_number (const GstAmfNode *node); +const GstAmfNode *gst_amf_node_get_object (const GstAmfNode *node, const char *field_name); +int gst_amf_node_get_object_length (const GstAmfNode *node); +const GstAmfNode *gst_amf_node_get_object_by_index (const GstAmfNode *node, int i); + void gst_amf_object_set_number (GstAmfNode *node, const char *field_name, double val); void gst_amf_object_set_string (GstAmfNode *node, const char *field_name, @@ -77,6 +84,9 @@ void gst_amf_object_set_string (GstAmfNode *node, const char *field_name, GBytes * gst_amf_serialize_command (const char *command_name, int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args); +GBytes * gst_amf_serialize_command2 (const char *command_name, + int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args, + GstAmfNode *n3, GstAmfNode *n4); G_END_DECLS diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index b23fe45..6a22c9e 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -193,8 +193,8 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes, header->message_type_id = data[offset]; offset += 1; - header->info = (data[offset] << 24) | (data[offset + 1] << 16) | - (data[offset + 2] << 8) | data[offset + 3]; + header->info = (data[offset + 3] << 24) | (data[offset + 2] << 16) | + (data[offset + 1] << 8) | data[offset]; offset += 4; } else { header->timestamp = previous_header->timestamp; @@ -262,10 +262,10 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, data[5] = (chunk->message_length >> 8) & 0xff; data[6] = chunk->message_length & 0xff; data[7] = chunk->message_type_id; - data[8] = (chunk->info >> 24) & 0xff; - data[9] = (chunk->info >> 16) & 0xff; - data[10] = (chunk->info >> 8) & 0xff; - data[11] = chunk->info & 0xff; + data[8] = chunk->info & 0xff; + data[9] = (chunk->info >> 8) & 0xff; + data[10] = (chunk->info >> 16) & 0xff; + data[11] = (chunk->info >> 24) & 0xff; offset = 12; } else { data[1] = (timestamp >> 16) & 0xff; @@ -377,3 +377,54 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry, entry->previous_header.message_type_id = chunk->message_type_id; entry->previous_header.info = chunk->info; } + +gboolean +gst_rtmp_chunk_parse_message (GstRtmpChunk * chunk, char **command_name, + double *transaction_id, GstAmfNode ** command_object, + GstAmfNode ** optional_args) +{ + gsize n_parsed; + const guint8 *data; + gsize size; + int offset; + GstAmfNode *n1, *n2, *n3, *n4; + + offset = 0; + data = g_bytes_get_data (chunk->payload, &size); + n1 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + n2 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + n3 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + offset += n_parsed; + if (offset < size) { + n4 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed); + } else { + n4 = NULL; + } + + if (command_name) { + *command_name = g_strdup (gst_amf_node_get_string (n1)); + } + gst_amf_node_free (n1); + + if (transaction_id) { + *transaction_id = gst_amf_node_get_number (n2); + } + gst_amf_node_free (n2); + + if (command_object) { + *command_object = n3; + } else { + gst_amf_node_free (n3); + } + + if (optional_args) { + *optional_args = n4; + } else { + if (n4) + gst_amf_node_free (n4); + } + + return TRUE; +} diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 2fa75ca..8b2b707 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -21,6 +21,7 @@ #define _GST_RTMP_CHUNK_H_ #include <glib.h> +#include "rtmp/amf.h" G_BEGIN_DECLS @@ -99,6 +100,10 @@ GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk); gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes); gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes, GstRtmpChunkHeader *previous_header); +gboolean gst_rtmp_chunk_parse_message (GstRtmpChunk *chunk, + char **command_name, double *transaction_id, + GstAmfNode **command_object, GstAmfNode **optional_args); + /* chunk cache */ diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 8b9eae8..241161e 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -206,6 +206,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, addr = g_network_address_new (client->server_address, client->server_port); client->socket_client = g_socket_client_new (); + g_socket_client_set_timeout (client->socket_client, 5); GST_DEBUG ("g_socket_client_connect_async"); g_socket_client_connect_async (client->socket_client, addr, diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 5ac713a..4952d29 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -40,6 +40,7 @@ static void gst_rtmp_connection_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_finalize (GObject * object); +static void gst_rtmp_connection_got_closed (GstRtmpConnection * connection); static gboolean gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data); static gboolean gst_rtmp_connection_output_ready (GOutputStream * os, @@ -70,10 +71,20 @@ static void gst_rtmp_connection_start_output (GstRtmpConnection * sc); static void gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, GstRtmpChunk * chunk); - +static void gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, + GstRtmpChunk * chunk); static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); +typedef struct _CommandCallback CommandCallback; +struct _CommandCallback +{ + int stream_id; + int transaction_id; + GstRtmpCommandCallback func; + gpointer user_data; +}; + enum { PROP_0 @@ -100,6 +111,13 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, got_chunk), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); + g_signal_new ("got-control-chunk", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, + got_control_chunk), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); + g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, closed), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0); } static void @@ -228,8 +246,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) return G_SOURCE_REMOVE; } if (ret == 0) { - /* FIXME probably closed */ - GST_ERROR ("closed?"); + gst_rtmp_connection_got_closed (sc); return G_SOURCE_REMOVE; } @@ -294,6 +311,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) } static void +gst_rtmp_connection_got_closed (GstRtmpConnection * connection) +{ + connection->closed = TRUE; + g_signal_emit_by_name (connection, "closed"); +} + +static void gst_rtmp_connection_write_chunk_done (GObject * obj, GAsyncResult * res, gpointer user_data) { @@ -309,8 +333,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, ret = g_output_stream_write_bytes_finish (os, res, &error); if (ret < 0) { - GST_ERROR ("write error: %s", error->message); - g_assert_not_reached (); + GST_DEBUG ("write error: %s", error->message); + gst_rtmp_connection_got_closed (connection); g_error_free (error); return; } @@ -526,19 +550,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) header.message_length); entry->payload = NULL; - if (entry->chunk->stream_id == 0x02) { - GST_DEBUG ("got protocol control message, type: %d", - entry->chunk->message_type_id); - gst_rtmp_connection_handle_pcm (sc, entry->chunk); - g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk); - } else if (entry->chunk->message_type_id == 0x14) { - /* FIXME parse command */ - g_signal_emit_by_name (sc, "got-command", entry->chunk); - } else { - GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", - entry->chunk->message_length); - g_signal_emit_by_name (sc, "got-chunk", entry->chunk); - } + gst_rtmp_connection_handle_chunk (sc, entry->chunk); g_object_unref (entry->chunk); entry->chunk = NULL; @@ -552,6 +564,48 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) } static void +gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, GstRtmpChunk * chunk) +{ + if (chunk->stream_id == 0x02) { + GST_DEBUG ("got protocol control message, type: %d", + chunk->message_type_id); + gst_rtmp_connection_handle_pcm (sc, chunk); + g_signal_emit_by_name (sc, "got-control-chunk", chunk); + } else { + if (chunk->message_type_id == 0x14) { + CommandCallback *cb = NULL; + GList *g; + char *command_name; + double transaction_id; + GstAmfNode *command_object; + GstAmfNode *optional_args; + + gst_rtmp_chunk_parse_message (chunk, &command_name, &transaction_id, + &command_object, &optional_args); + for (g = sc->command_callbacks; g; g = g_list_next (g)) { + cb = g->data; + if (cb->stream_id == chunk->stream_id && + cb->transaction_id == transaction_id) { + break; + } + } + if (cb) { + sc->command_callbacks = g_list_remove (sc->command_callbacks, cb); + cb->func (sc, chunk, command_name, transaction_id, command_object, + optional_args, cb->user_data); + g_free (command_name); + gst_amf_node_free (command_object); + if (optional_args) + gst_amf_node_free (optional_args); + g_free (cb); + } + } + GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length); + g_signal_emit_by_name (sc, "got-chunk", chunk); + } +} + +static void gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, GstRtmpChunk * chunk) { @@ -770,7 +824,8 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection) int gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id, const char *command_name, int transaction_id, GstAmfNode * command_object, - GstAmfNode * optional_args) + GstAmfNode * optional_args, GstRtmpCommandCallback response_command, + gpointer user_data) { GstRtmpChunk *chunk; @@ -782,8 +837,59 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id, chunk->payload = gst_amf_serialize_command (command_name, transaction_id, command_object, optional_args); + chunk->message_length = g_bytes_get_size (chunk->payload); gst_rtmp_connection_queue_chunk (connection, chunk); + if (response_command) { + CommandCallback *callback; + + callback = g_malloc0 (sizeof (CommandCallback)); + callback->stream_id = stream_id; + callback->transaction_id = transaction_id; + callback->func = response_command; + callback->user_data = user_data; + + connection->command_callbacks = + g_list_append (connection->command_callbacks, callback); + } + + return transaction_id; +} + +int +gst_rtmp_connection_send_command2 (GstRtmpConnection * connection, + int chunk_stream_id, int stream_id, + const char *command_name, int transaction_id, GstAmfNode * command_object, + GstAmfNode * optional_args, GstAmfNode * n3, GstAmfNode * n4, + GstRtmpCommandCallback response_command, gpointer user_data) +{ + GstRtmpChunk *chunk; + + chunk = gst_rtmp_chunk_new (); + chunk->stream_id = chunk_stream_id; + chunk->timestamp = 0; /* FIXME */ + chunk->message_type_id = 0x14; + chunk->info = stream_id; + + chunk->payload = gst_amf_serialize_command2 (command_name, transaction_id, + command_object, optional_args, n3, n4); + chunk->message_length = g_bytes_get_size (chunk->payload); + + gst_rtmp_connection_queue_chunk (connection, chunk); + + if (response_command) { + CommandCallback *callback; + + callback = g_malloc0 (sizeof (CommandCallback)); + callback->stream_id = stream_id; + callback->transaction_id = transaction_id; + callback->func = response_command; + callback->user_data = user_data; + + connection->command_callbacks = + g_list_append (connection->command_callbacks, callback); + } + return transaction_id; } diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index aa2b718..3e9a2aa 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -35,6 +35,10 @@ G_BEGIN_DECLS typedef struct _GstRtmpConnection GstRtmpConnection; typedef struct _GstRtmpConnectionClass GstRtmpConnectionClass; typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection *connection); +typedef void (*GstRtmpCommandCallback) (GstRtmpConnection *connection, + GstRtmpChunk *chunk, const char *command_name, int transaction_id, + GstAmfNode *command_object, GstAmfNode *optional_args, + gpointer user_data); struct _GstRtmpConnection { @@ -42,6 +46,7 @@ struct _GstRtmpConnection /* should be properties */ gboolean input_paused; + gboolean closed; /* private */ GSocketConnection *connection; @@ -60,6 +65,7 @@ struct _GstRtmpConnection gboolean handshake_complete; GstRtmpChunkCache *input_chunk_cache; GstRtmpChunkCache *output_chunk_cache; + GList *command_callbacks; /* chunk currently being written */ GstRtmpChunk *output_chunk; @@ -75,11 +81,10 @@ struct _GstRtmpConnectionClass GObjectClass object_class; /* signals */ - void (*got_chunk) (GstRtmpConnection *connection, + void (*got_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk); + void (*got_control_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk); - void (*got_command) (GstRtmpConnection *connection, - const char *command_name, int transaction_id, - GstAmfNode *command_object, GstAmfNode *option_args); + void (*closed) (GstRtmpConnection *connection); }; GType gst_rtmp_connection_get_type (void); @@ -94,7 +99,13 @@ void gst_rtmp_connection_dump (GstRtmpConnection *connection); int gst_rtmp_connection_send_command (GstRtmpConnection *connection, int stream_id, const char *command_name, int transaction_id, - GstAmfNode *command_object, GstAmfNode *optional_args); + GstAmfNode *command_object, GstAmfNode *optional_args, + GstRtmpCommandCallback response_command, gpointer user_data); +int gst_rtmp_connection_send_command2 (GstRtmpConnection *connection, + int chunk_stream_id, int stream_id, const char *command_name, + int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args, + GstAmfNode *n3, GstAmfNode *n4, + GstRtmpCommandCallback response_command, gpointer user_data); G_END_DECLS |