summaryrefslogtreecommitdiff
path: root/rtmp
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-30 13:17:13 -0700
committerDavid Schleef <ds@schleef.org>2014-08-30 13:17:13 -0700
commitcff0d57190f3f0b9c9d0c30012b959f12055da08 (patch)
tree2eadf183af3c3d6b1062eebd3f06053be16e0378 /rtmp
parentbf029aebe192f30de5d79281d0b8c44f94cfd233 (diff)
hacking
Diffstat (limited to 'rtmp')
-rw-r--r--rtmp/amf.c81
-rw-r--r--rtmp/amf.h10
-rw-r--r--rtmp/rtmpchunk.c63
-rw-r--r--rtmp/rtmpchunk.h5
-rw-r--r--rtmp/rtmpclient.c1
-rw-r--r--rtmp/rtmpconnection.c144
-rw-r--r--rtmp/rtmpconnection.h21
7 files changed, 293 insertions, 32 deletions
diff --git a/rtmp/amf.c b/rtmp/amf.c
index a510e67..c942fea 100644
--- a/rtmp/amf.c
+++ b/rtmp/amf.c
@@ -552,11 +552,13 @@ gst_amf_serialize_command (const char *command_name, int transaction_id,
serializer->size = 4096;
serializer->data = g_malloc (serializer->size);
+ _serialize_u8 (serializer, GST_AMF_TYPE_STRING);
_serialize_utf8_string (serializer, command_name);
+ _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER);
_serialize_number (serializer, transaction_id);
- _serialize_object (serializer, command_object);
+ _serialize_value (serializer, command_object);
if (optional_args)
- _serialize_object (serializer, optional_args);
+ _serialize_value (serializer, optional_args);
if (serializer->error) {
GST_ERROR ("failed to serialize");
@@ -565,3 +567,78 @@ gst_amf_serialize_command (const char *command_name, int transaction_id,
}
return g_bytes_new_take (serializer->data, serializer->offset);
}
+
+GBytes *
+gst_amf_serialize_command2 (const char *command_name, int transaction_id,
+ GstAmfNode * command_object, GstAmfNode * optional_args, GstAmfNode * n3,
+ GstAmfNode * n4)
+{
+ AmfSerializer _s = { 0 }, *serializer = &_s;
+
+ serializer->size = 4096;
+ serializer->data = g_malloc (serializer->size);
+
+ _serialize_u8 (serializer, GST_AMF_TYPE_STRING);
+ _serialize_utf8_string (serializer, command_name);
+ _serialize_u8 (serializer, GST_AMF_TYPE_NUMBER);
+ _serialize_number (serializer, transaction_id);
+ _serialize_value (serializer, command_object);
+ if (optional_args)
+ _serialize_value (serializer, optional_args);
+ if (n3)
+ _serialize_value (serializer, n3);
+ if (n4)
+ _serialize_value (serializer, n4);
+
+ if (serializer->error) {
+ GST_ERROR ("failed to serialize");
+ g_free (serializer->data);
+ return NULL;
+ }
+ return g_bytes_new_take (serializer->data, serializer->offset);
+}
+
+gboolean
+gst_amf_node_get_boolean (const GstAmfNode * node)
+{
+ return node->int_val;
+}
+
+const char *
+gst_amf_node_get_string (const GstAmfNode * node)
+{
+ return node->string_val;
+}
+
+double
+gst_amf_node_get_number (const GstAmfNode * node)
+{
+ return node->double_val;
+}
+
+const GstAmfNode *
+gst_amf_node_get_object (const GstAmfNode * node, const char *field_name)
+{
+ int i;
+ for (i = 0; i < node->array_val->len; i++) {
+ AmfObjectField *field = g_ptr_array_index (node->array_val, i);
+ if (strcmp (field->name, field_name) == 0) {
+ return field->value;
+ }
+ }
+ return NULL;
+}
+
+int
+gst_amf_node_get_object_length (const GstAmfNode * node)
+{
+ return node->array_val->len;
+}
+
+const GstAmfNode *
+gst_amf_node_get_object_by_index (const GstAmfNode * node, int i)
+{
+ AmfObjectField *field;
+ field = g_ptr_array_index (node->array_val, i);
+ return field->value;
+}
diff --git a/rtmp/amf.h b/rtmp/amf.h
index 0ace7d3..4ab9676 100644
--- a/rtmp/amf.h
+++ b/rtmp/amf.h
@@ -70,6 +70,13 @@ void gst_amf_node_set_ecma_array (GstAmfNode *node, guint8 *data, int size);
void gst_amf_object_append_take (GstAmfNode *node, const char *s,
GstAmfNode *child_node);
+gboolean gst_amf_node_get_boolean (const GstAmfNode *node);
+const char *gst_amf_node_get_string (const GstAmfNode *node);
+double gst_amf_node_get_number (const GstAmfNode *node);
+const GstAmfNode *gst_amf_node_get_object (const GstAmfNode *node, const char *field_name);
+int gst_amf_node_get_object_length (const GstAmfNode *node);
+const GstAmfNode *gst_amf_node_get_object_by_index (const GstAmfNode *node, int i);
+
void gst_amf_object_set_number (GstAmfNode *node, const char *field_name,
double val);
void gst_amf_object_set_string (GstAmfNode *node, const char *field_name,
@@ -77,6 +84,9 @@ void gst_amf_object_set_string (GstAmfNode *node, const char *field_name,
GBytes * gst_amf_serialize_command (const char *command_name,
int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args);
+GBytes * gst_amf_serialize_command2 (const char *command_name,
+ int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstAmfNode *n3, GstAmfNode *n4);
G_END_DECLS
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index b23fe45..6a22c9e 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -193,8 +193,8 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes,
header->message_type_id = data[offset];
offset += 1;
- header->info = (data[offset] << 24) | (data[offset + 1] << 16) |
- (data[offset + 2] << 8) | data[offset + 3];
+ header->info = (data[offset + 3] << 24) | (data[offset + 2] << 16) |
+ (data[offset + 1] << 8) | data[offset];
offset += 4;
} else {
header->timestamp = previous_header->timestamp;
@@ -262,10 +262,10 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk,
data[5] = (chunk->message_length >> 8) & 0xff;
data[6] = chunk->message_length & 0xff;
data[7] = chunk->message_type_id;
- data[8] = (chunk->info >> 24) & 0xff;
- data[9] = (chunk->info >> 16) & 0xff;
- data[10] = (chunk->info >> 8) & 0xff;
- data[11] = chunk->info & 0xff;
+ data[8] = chunk->info & 0xff;
+ data[9] = (chunk->info >> 8) & 0xff;
+ data[10] = (chunk->info >> 16) & 0xff;
+ data[11] = (chunk->info >> 24) & 0xff;
offset = 12;
} else {
data[1] = (timestamp >> 16) & 0xff;
@@ -377,3 +377,54 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry,
entry->previous_header.message_type_id = chunk->message_type_id;
entry->previous_header.info = chunk->info;
}
+
+gboolean
+gst_rtmp_chunk_parse_message (GstRtmpChunk * chunk, char **command_name,
+ double *transaction_id, GstAmfNode ** command_object,
+ GstAmfNode ** optional_args)
+{
+ gsize n_parsed;
+ const guint8 *data;
+ gsize size;
+ int offset;
+ GstAmfNode *n1, *n2, *n3, *n4;
+
+ offset = 0;
+ data = g_bytes_get_data (chunk->payload, &size);
+ n1 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ n2 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ n3 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ offset += n_parsed;
+ if (offset < size) {
+ n4 = gst_amf_node_new_parse (data + offset, size - offset, &n_parsed);
+ } else {
+ n4 = NULL;
+ }
+
+ if (command_name) {
+ *command_name = g_strdup (gst_amf_node_get_string (n1));
+ }
+ gst_amf_node_free (n1);
+
+ if (transaction_id) {
+ *transaction_id = gst_amf_node_get_number (n2);
+ }
+ gst_amf_node_free (n2);
+
+ if (command_object) {
+ *command_object = n3;
+ } else {
+ gst_amf_node_free (n3);
+ }
+
+ if (optional_args) {
+ *optional_args = n4;
+ } else {
+ if (n4)
+ gst_amf_node_free (n4);
+ }
+
+ return TRUE;
+}
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 2fa75ca..8b2b707 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -21,6 +21,7 @@
#define _GST_RTMP_CHUNK_H_
#include <glib.h>
+#include "rtmp/amf.h"
G_BEGIN_DECLS
@@ -99,6 +100,10 @@ GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk);
gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes);
gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes,
GstRtmpChunkHeader *previous_header);
+gboolean gst_rtmp_chunk_parse_message (GstRtmpChunk *chunk,
+ char **command_name, double *transaction_id,
+ GstAmfNode **command_object, GstAmfNode **optional_args);
+
/* chunk cache */
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 8b9eae8..241161e 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -206,6 +206,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
addr = g_network_address_new (client->server_address, client->server_port);
client->socket_client = g_socket_client_new ();
+ g_socket_client_set_timeout (client->socket_client, 5);
GST_DEBUG ("g_socket_client_connect_async");
g_socket_client_connect_async (client->socket_client, addr,
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 5ac713a..4952d29 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -40,6 +40,7 @@ static void gst_rtmp_connection_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
+static void gst_rtmp_connection_got_closed (GstRtmpConnection * connection);
static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data);
static gboolean gst_rtmp_connection_output_ready (GOutputStream * os,
@@ -70,10 +71,20 @@ static void gst_rtmp_connection_start_output (GstRtmpConnection * sc);
static void
gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
GstRtmpChunk * chunk);
-
+static void gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc,
+ GstRtmpChunk * chunk);
static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
+typedef struct _CommandCallback CommandCallback;
+struct _CommandCallback
+{
+ int stream_id;
+ int transaction_id;
+ GstRtmpCommandCallback func;
+ gpointer user_data;
+};
+
enum
{
PROP_0
@@ -100,6 +111,13 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass,
got_chunk), NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
+ g_signal_new ("got-control-chunk", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass,
+ got_control_chunk), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
+ g_signal_new ("closed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpConnectionClass, closed),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
}
static void
@@ -228,8 +246,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
return G_SOURCE_REMOVE;
}
if (ret == 0) {
- /* FIXME probably closed */
- GST_ERROR ("closed?");
+ gst_rtmp_connection_got_closed (sc);
return G_SOURCE_REMOVE;
}
@@ -294,6 +311,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
}
static void
+gst_rtmp_connection_got_closed (GstRtmpConnection * connection)
+{
+ connection->closed = TRUE;
+ g_signal_emit_by_name (connection, "closed");
+}
+
+static void
gst_rtmp_connection_write_chunk_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
@@ -309,8 +333,8 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
ret = g_output_stream_write_bytes_finish (os, res, &error);
if (ret < 0) {
- GST_ERROR ("write error: %s", error->message);
- g_assert_not_reached ();
+ GST_DEBUG ("write error: %s", error->message);
+ gst_rtmp_connection_got_closed (connection);
g_error_free (error);
return;
}
@@ -526,19 +550,7 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
header.message_length);
entry->payload = NULL;
- if (entry->chunk->stream_id == 0x02) {
- GST_DEBUG ("got protocol control message, type: %d",
- entry->chunk->message_type_id);
- gst_rtmp_connection_handle_pcm (sc, entry->chunk);
- g_signal_emit_by_name (sc, "got-control-chunk", entry->chunk);
- } else if (entry->chunk->message_type_id == 0x14) {
- /* FIXME parse command */
- g_signal_emit_by_name (sc, "got-command", entry->chunk);
- } else {
- GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes",
- entry->chunk->message_length);
- g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
- }
+ gst_rtmp_connection_handle_chunk (sc, entry->chunk);
g_object_unref (entry->chunk);
entry->chunk = NULL;
@@ -552,6 +564,48 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
}
static void
+gst_rtmp_connection_handle_chunk (GstRtmpConnection * sc, GstRtmpChunk * chunk)
+{
+ if (chunk->stream_id == 0x02) {
+ GST_DEBUG ("got protocol control message, type: %d",
+ chunk->message_type_id);
+ gst_rtmp_connection_handle_pcm (sc, chunk);
+ g_signal_emit_by_name (sc, "got-control-chunk", chunk);
+ } else {
+ if (chunk->message_type_id == 0x14) {
+ CommandCallback *cb = NULL;
+ GList *g;
+ char *command_name;
+ double transaction_id;
+ GstAmfNode *command_object;
+ GstAmfNode *optional_args;
+
+ gst_rtmp_chunk_parse_message (chunk, &command_name, &transaction_id,
+ &command_object, &optional_args);
+ for (g = sc->command_callbacks; g; g = g_list_next (g)) {
+ cb = g->data;
+ if (cb->stream_id == chunk->stream_id &&
+ cb->transaction_id == transaction_id) {
+ break;
+ }
+ }
+ if (cb) {
+ sc->command_callbacks = g_list_remove (sc->command_callbacks, cb);
+ cb->func (sc, chunk, command_name, transaction_id, command_object,
+ optional_args, cb->user_data);
+ g_free (command_name);
+ gst_amf_node_free (command_object);
+ if (optional_args)
+ gst_amf_node_free (optional_args);
+ g_free (cb);
+ }
+ }
+ GST_DEBUG ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length);
+ g_signal_emit_by_name (sc, "got-chunk", chunk);
+ }
+}
+
+static void
gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
GstRtmpChunk * chunk)
{
@@ -770,7 +824,8 @@ gst_rtmp_connection_dump (GstRtmpConnection * connection)
int
gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id,
const char *command_name, int transaction_id, GstAmfNode * command_object,
- GstAmfNode * optional_args)
+ GstAmfNode * optional_args, GstRtmpCommandCallback response_command,
+ gpointer user_data)
{
GstRtmpChunk *chunk;
@@ -782,8 +837,59 @@ gst_rtmp_connection_send_command (GstRtmpConnection * connection, int stream_id,
chunk->payload = gst_amf_serialize_command (command_name, transaction_id,
command_object, optional_args);
+ chunk->message_length = g_bytes_get_size (chunk->payload);
gst_rtmp_connection_queue_chunk (connection, chunk);
+ if (response_command) {
+ CommandCallback *callback;
+
+ callback = g_malloc0 (sizeof (CommandCallback));
+ callback->stream_id = stream_id;
+ callback->transaction_id = transaction_id;
+ callback->func = response_command;
+ callback->user_data = user_data;
+
+ connection->command_callbacks =
+ g_list_append (connection->command_callbacks, callback);
+ }
+
+ return transaction_id;
+}
+
+int
+gst_rtmp_connection_send_command2 (GstRtmpConnection * connection,
+ int chunk_stream_id, int stream_id,
+ const char *command_name, int transaction_id, GstAmfNode * command_object,
+ GstAmfNode * optional_args, GstAmfNode * n3, GstAmfNode * n4,
+ GstRtmpCommandCallback response_command, gpointer user_data)
+{
+ GstRtmpChunk *chunk;
+
+ chunk = gst_rtmp_chunk_new ();
+ chunk->stream_id = chunk_stream_id;
+ chunk->timestamp = 0; /* FIXME */
+ chunk->message_type_id = 0x14;
+ chunk->info = stream_id;
+
+ chunk->payload = gst_amf_serialize_command2 (command_name, transaction_id,
+ command_object, optional_args, n3, n4);
+ chunk->message_length = g_bytes_get_size (chunk->payload);
+
+ gst_rtmp_connection_queue_chunk (connection, chunk);
+
+ if (response_command) {
+ CommandCallback *callback;
+
+ callback = g_malloc0 (sizeof (CommandCallback));
+ callback->stream_id = stream_id;
+ callback->transaction_id = transaction_id;
+ callback->func = response_command;
+ callback->user_data = user_data;
+
+ connection->command_callbacks =
+ g_list_append (connection->command_callbacks, callback);
+ }
+
return transaction_id;
}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index aa2b718..3e9a2aa 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -35,6 +35,10 @@ G_BEGIN_DECLS
typedef struct _GstRtmpConnection GstRtmpConnection;
typedef struct _GstRtmpConnectionClass GstRtmpConnectionClass;
typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection *connection);
+typedef void (*GstRtmpCommandCallback) (GstRtmpConnection *connection,
+ GstRtmpChunk *chunk, const char *command_name, int transaction_id,
+ GstAmfNode *command_object, GstAmfNode *optional_args,
+ gpointer user_data);
struct _GstRtmpConnection
{
@@ -42,6 +46,7 @@ struct _GstRtmpConnection
/* should be properties */
gboolean input_paused;
+ gboolean closed;
/* private */
GSocketConnection *connection;
@@ -60,6 +65,7 @@ struct _GstRtmpConnection
gboolean handshake_complete;
GstRtmpChunkCache *input_chunk_cache;
GstRtmpChunkCache *output_chunk_cache;
+ GList *command_callbacks;
/* chunk currently being written */
GstRtmpChunk *output_chunk;
@@ -75,11 +81,10 @@ struct _GstRtmpConnectionClass
GObjectClass object_class;
/* signals */
- void (*got_chunk) (GstRtmpConnection *connection,
+ void (*got_chunk) (GstRtmpConnection *connection, GstRtmpChunk *chunk);
+ void (*got_control_chunk) (GstRtmpConnection *connection,
GstRtmpChunk *chunk);
- void (*got_command) (GstRtmpConnection *connection,
- const char *command_name, int transaction_id,
- GstAmfNode *command_object, GstAmfNode *option_args);
+ void (*closed) (GstRtmpConnection *connection);
};
GType gst_rtmp_connection_get_type (void);
@@ -94,7 +99,13 @@ void gst_rtmp_connection_dump (GstRtmpConnection *connection);
int gst_rtmp_connection_send_command (GstRtmpConnection *connection,
int stream_id, const char *command_name, int transaction_id,
- GstAmfNode *command_object, GstAmfNode *optional_args);
+ GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstRtmpCommandCallback response_command, gpointer user_data);
+int gst_rtmp_connection_send_command2 (GstRtmpConnection *connection,
+ int chunk_stream_id, int stream_id, const char *command_name,
+ int transaction_id, GstAmfNode *command_object, GstAmfNode *optional_args,
+ GstAmfNode *n3, GstAmfNode *n4,
+ GstRtmpCommandCallback response_command, gpointer user_data);
G_END_DECLS