diff options
author | David Schleef <ds@schleef.org> | 2014-08-23 22:41:19 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-23 22:41:19 -0700 |
commit | 6b0cac8398d8a85c3686729cce1f8c1180a90673 (patch) | |
tree | 8d2e9967cc80d01552378e0c918aaf60721cd0cd | |
parent | 45301abe260ed6c517cfc31ff0d6f23a0f1d36d6 (diff) |
hacking
-rw-r--r-- | rtmp/rtmpchunk.c | 155 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 30 | ||||
-rw-r--r-- | rtmp/rtmpserver.c | 43 | ||||
-rw-r--r-- | rtmp/rtmpserver.h | 13 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.c | 112 | ||||
-rw-r--r-- | rtmp/rtmpserverconnection.h | 5 | ||||
-rw-r--r-- | tools/client-test.c | 12 | ||||
-rw-r--r-- | tools/proxy-server.c | 57 |
8 files changed, 320 insertions, 107 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index de80211..de0f46b 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -119,3 +119,158 @@ gst_rtmp_chunk_finalize (GObject * object) G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->finalize (object); } + +GstRtmpChunk * +gst_rtmp_chunk_new (void) +{ + return g_object_new (GST_TYPE_RTMP_CHUNK, NULL); +} + +static GstRtmpChunkParseStatus +chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) +{ + int offset; + const guint8 *data; + gsize size; + int header_fmt; + + if (*needed_bytes) + *needed_bytes = 0; + + data = g_bytes_get_data (bytes, &size); + if (size < 1) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + + header_fmt = data[0] >> 6; + chunk->stream_id = data[0] & 0x3f; + offset = 1; + if (chunk->stream_id == 0) { + if (size < 2) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->stream_id = 64 + data[1]; + offset = 2; + } else if (chunk->stream_id == 1) { + if (size < 3) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->stream_id = 64 + data[1] + (data[2] << 8); + offset = 3; + } + if (header_fmt == 0) { + if (size < offset + 11) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->timestamp = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + chunk->message_length = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + if (chunk->timestamp == 0xffffff) { + if (size < offset + 4) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) | + (data[offset + 2] << 8) | data[offset + 3]; + offset += 4; + } + chunk->message_type_id = data[offset + 6]; + offset += 1; + + /* 4 byte something here */ + offset += 4; + } else if (header_fmt == 1) { + if (size < offset + 7) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + GST_ERROR ("unimplemented: need previous chunk"); + chunk->timestamp = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + chunk->message_length = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + } else if (header_fmt == 2) { + if (size < offset + 3) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->timestamp = 0; + GST_ERROR ("unimplemented: need previous chunk"); + return GST_RTMP_CHUNK_PARSE_ERROR; + } else { + chunk->timestamp = 0; + GST_ERROR ("unimplemented: need previous chunk"); + return GST_RTMP_CHUNK_PARSE_ERROR; + } + + if (needed_bytes) + *needed_bytes = offset + chunk->message_length; + if (size < offset + chunk->message_length) + return GST_RTMP_CHUNK_PARSE_NEED_BYTES; + + chunk->payload = + g_bytes_new_from_bytes (bytes, offset, chunk->message_length); + return GST_RTMP_CHUNK_PARSE_OK; +} + +GstRtmpChunkParseStatus +gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size) +{ + GstRtmpChunk *chunk; + GstRtmpChunkParseStatus status; + + chunk = gst_rtmp_chunk_new (); + status = chunk_parse (chunk, bytes, chunk_size); + g_object_unref (chunk); + + return status; +} + +GstRtmpChunk * +gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size) +{ + GstRtmpChunk *chunk; + GstRtmpChunkParseStatus status; + + chunk = gst_rtmp_chunk_new (); + status = chunk_parse (chunk, bytes, chunk_size); + if (status == GST_RTMP_CHUNK_PARSE_OK) + return chunk; + + g_object_unref (chunk); + return NULL; +} + +void +gst_rtmp_chunk_set_stream_id (GstRtmpChunk * chunk, guint32 stream_id) +{ + chunk->stream_id = stream_id; +} + +void +gst_rtmp_chunk_set_timestamp (GstRtmpChunk * chunk, guint32 timestamp) +{ + chunk->timestamp = timestamp; +} + +void +gst_rtmp_chunk_set_payload (GstRtmpChunk * chunk, GBytes * payload) +{ + if (chunk->payload) { + g_bytes_unref (chunk->payload); + } + chunk->payload = payload; +} + +guint32 +gst_rtmp_chunk_get_stream_id (GstRtmpChunk * chunk) +{ + return chunk->stream_id; +} + +guint32 +gst_rtmp_chunk_get_timestamp (GstRtmpChunk * chunk) +{ + return chunk->timestamp; +} + +GBytes * +gst_rtmp_chunk_get_payload (GstRtmpChunk * chunk) +{ + return chunk->payload; +} diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 5e799ee..68edfcb 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -20,6 +20,7 @@ #ifndef _GST_RTMP_CHUNK_H_ #define _GST_RTMP_CHUNK_H_ +#include <glib.h> G_BEGIN_DECLS @@ -36,13 +37,12 @@ struct _GstRtmpChunk { GObject object; - guint32 timestamp; - int type_id; - guint32 chunk_stream_id; guint32 stream_id; - int length; - guint8 *data; + guint32 timestamp; + gsize message_length; + int message_type_id; + GBytes *payload; }; struct _GstRtmpChunkClass @@ -50,8 +50,28 @@ struct _GstRtmpChunkClass GObjectClass object_class; }; +typedef enum { + GST_RTMP_CHUNK_PARSE_ERROR = 0, + GST_RTMP_CHUNK_PARSE_OK, + GST_RTMP_CHUNK_PARSE_UNKNOWN, + GST_RTMP_CHUNK_PARSE_NEED_BYTES, +} GstRtmpChunkParseStatus; + GType gst_rtmp_chunk_get_type (void); +GstRtmpChunk *gst_rtmp_chunk_new (void); +GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes, + gsize *chunk_size); +GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size); + +void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id); +void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp); +void gst_rtmp_chunk_set_payload (GstRtmpChunk *chunk, GBytes *payload); + +guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk); +guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk); +GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk); + G_END_DECLS #endif diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index 0b089b4..4996073 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -22,8 +22,7 @@ #endif #include <gst/gst.h> -#include "rtmpserver.h" -#include "rtmpserverconnection.h" +#include <rtmp/rtmpserver.h> GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_debug_category); #define GST_CAT_DEFAULT gst_rtmp_server_debug_category @@ -62,6 +61,14 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass) gobject_class->dispose = gst_rtmp_server_dispose; gobject_class->finalize = gst_rtmp_server_finalize; + g_signal_new ("add-connection", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass, + add_connection), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION); + g_signal_new ("remove-connection", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass, + remove_connection), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION); } static void @@ -108,6 +115,13 @@ gst_rtmp_server_dispose (GObject * object) GST_DEBUG_OBJECT (rtmpserver, "dispose"); /* clean up as possible. may be called multiple times */ + g_list_free_full (rtmpserver->connections, g_object_unref); + rtmpserver->connections = NULL; + + if (rtmpserver->socket_service) { + g_object_unref (rtmpserver->socket_service); + rtmpserver->socket_service = NULL; + } G_OBJECT_CLASS (gst_rtmp_server_parent_class)->dispose (object); } @@ -144,8 +158,8 @@ gst_rtmp_server_start (GstRtmpServer * rtmpserver) rtmpserver->socket_service = g_socket_service_new (); ret = - g_socket_listener_add_inet_port (G_SOCKET_LISTENER (rtmpserver-> - socket_service), rtmpserver->port, NULL, &error); + g_socket_listener_add_inet_port (G_SOCKET_LISTENER + (rtmpserver->socket_service), rtmpserver->port, NULL, &error); if (!ret) { GST_ERROR ("failed to add address: %s", error->message); g_object_unref (rtmpserver->socket_service); @@ -168,8 +182,25 @@ gst_rtmp_server_incoming (GSocketService * service, g_object_ref (connection); server_connection = gst_rtmp_server_connection_new (connection); - (void) server_connection; - (void) rtmpserver; + gst_rtmp_server_add_connection (rtmpserver, server_connection); return TRUE; } + +void +gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver, + GstRtmpServerConnection * connection) +{ + rtmpserver->connections = g_list_prepend (rtmpserver->connections, + connection); + g_signal_emit_by_name (rtmpserver, "add-connection", connection); +} + +void +gst_rtmp_server_remove_connection (GstRtmpServer * rtmpserver, + GstRtmpServerConnection * connection) +{ + rtmpserver->connections = g_list_remove (rtmpserver->connections, connection); + g_signal_emit_by_name (rtmpserver, "remove-connection", connection); + g_object_unref (connection); +} diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h index e1e0466..75c71e3 100644 --- a/rtmp/rtmpserver.h +++ b/rtmp/rtmpserver.h @@ -22,6 +22,8 @@ #include <gio/gio.h> +#include <rtmp/rtmpserverconnection.h> + G_BEGIN_DECLS #define GST_TYPE_RTMP_SERVER (gst_rtmp_server_get_type()) @@ -42,18 +44,29 @@ struct _GstRtmpServer /* private */ GSocketService *socket_service; + GList *connections; }; struct _GstRtmpServerClass { GObjectClass object_class; + + /* signals */ + void (*add_connection) (GstRtmpServer *server, + GstRtmpServerConnection *connection); + void (*remove_connection) (GstRtmpServer *server, + GstRtmpServerConnection *connection); }; GType gst_rtmp_server_get_type (void); GstRtmpServer *gst_rtmp_server_new (void); void gst_rtmp_server_start (GstRtmpServer * rtmpserver); +void gst_rtmp_server_add_connection (GstRtmpServer *rtmpserver, + GstRtmpServerConnection *connection); +void gst_rtmp_server_remove_connection (GstRtmpServer *rtmpserver, + GstRtmpServerConnection *connection); G_END_DECLS diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c index b3b08da..ef79d19 100644 --- a/rtmp/rtmpserverconnection.c +++ b/rtmp/rtmpserverconnection.c @@ -23,6 +23,7 @@ #include <gst/gst.h> #include "rtmpserverconnection.h" +#include "rtmpchunk.h" #include "amf.h" #include <string.h> @@ -66,6 +67,10 @@ gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass) gobject_class->dispose = gst_rtmp_server_connection_dispose; gobject_class->finalize = gst_rtmp_server_connection_finalize; + g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerConnectionClass, + got_chunk), NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK); } static void @@ -147,10 +152,6 @@ gst_rtmp_server_connection_new (GSocketConnection * connection) return sc; } -static void -gst_rtmp_server_connection_read_chunk_done (GObject * obj, - GAsyncResult * res, gpointer user_data); - typedef struct _ChunkRead ChunkRead; struct _ChunkRead { @@ -229,21 +230,15 @@ static void gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc) { GInputStream *is; - ChunkRead *chunk; - - chunk = g_malloc0 (sizeof (ChunkRead)); - chunk->alloc_size = 4096; - chunk->data = g_malloc (chunk->alloc_size); - chunk->server_connection = sc; is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); - g_input_stream_read_async (is, chunk->data, chunk->alloc_size, + g_input_stream_read_bytes_async (is, 4096, G_PRIORITY_DEFAULT, sc->cancellable, - gst_rtmp_server_connection_read_chunk_done, chunk); + gst_rtmp_server_connection_read_chunk_done, sc); } -static void +G_GNUC_UNUSED static void parse_message (guint8 * data, int size) { int offset; @@ -275,99 +270,38 @@ parse_message (guint8 * data, int size) } -typedef struct _Chunk Chunk; -struct _Chunk -{ - int stream_id; - int header_fmt; - guint32 timestamp; - int message_length; - int message_type_id; - guint32 message_stream_id; -}; - -static void -parse_chunk (guint8 * data, int size) -{ - Chunk _chunk, *chunk = &_chunk; - int offset; - - chunk->header_fmt = data[0] >> 6; - chunk->stream_id = data[0] & 0x3f; - offset = 1; - if (chunk->stream_id == 0) { - chunk->stream_id = 64 + data[1]; - offset = 2; - } else if (chunk->stream_id == 1) { - chunk->stream_id = 64 + data[1] + (data[2] << 8); - offset = 3; - } - chunk->timestamp = - (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; - chunk->message_length = - (data[offset + 3] << 16) | (data[offset + 4] << 8) | data[offset + 5]; - chunk->message_type_id = data[offset + 6]; - offset += 7; - - g_print ("header_fmt: %d\n", chunk->header_fmt); - g_print ("stream_id: %d\n", chunk->stream_id); - g_print ("timestamp: %d\n", chunk->timestamp); - g_print ("message_length: %d\n", chunk->message_length); - g_print ("message_type_id: %d\n", chunk->message_type_id); - - parse_message (data + offset, size - offset); -} - -static void -dump_data (guint8 * data, int size) -{ - int i, j; - for (i = 0; i < size; i += 16) { - g_print ("%04x: ", i); - for (j = 0; j < 16; j++) { - if (i + j < size) { - g_print ("%02x ", data[i + j]); - } else { - g_print (" "); - } - } - for (j = 0; j < 16; j++) { - if (i + j < size) { - g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.'); - } - } - g_print ("\n"); - } -} - static void gst_rtmp_server_connection_read_chunk_done (GObject * obj, GAsyncResult * res, gpointer user_data) { GInputStream *is = G_INPUT_STREAM (obj); - ChunkRead *chunk = (ChunkRead *) user_data; + GstRtmpServerConnection *server_connection = + GST_RTMP_SERVER_CONNECTION (user_data); + GstRtmpChunk *chunk; GError *error = NULL; - gssize ret; + gsize chunk_size; + GBytes *bytes; GST_ERROR ("gst_rtmp_server_connection_read_chunk_done"); - ret = g_input_stream_read_finish (is, res, &error); - if (ret < 0) { + bytes = g_input_stream_read_bytes_finish (is, res, &error); + if (bytes == NULL) { GST_ERROR ("read error: %s", error->message); g_error_free (error); return; } - GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", ret); - chunk->size = ret; + GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes)); + + chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size); - parse_chunk (chunk->data, chunk->size); - dump_data (chunk->data, chunk->size); + if (chunk) { + g_signal_emit_by_name (server_connection, "got-chunk", chunk); - if (0) - proxy_write_chunk (chunk->server_connection, chunk); + g_object_unref (chunk); + } } -static void +G_GNUC_UNUSED static void proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk) { GOutputStream *os; diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h index ab1f53c..19f06ab 100644 --- a/rtmp/rtmpserverconnection.h +++ b/rtmp/rtmpserverconnection.h @@ -21,6 +21,7 @@ #define _GST_RTMP_SERVER_CONNECTION_H_ #include <gio/gio.h> +#include <rtmp/rtmpchunk.h> G_BEGIN_DECLS @@ -48,6 +49,10 @@ struct _GstRtmpServerConnection struct _GstRtmpServerConnectionClass { GObjectClass object_class; + + /* signals */ + void (*got_chunk) (GstRtmpServerConnection *connection, + GstRtmpChunk *chunk); }; GType gst_rtmp_server_connection_get_type (void); diff --git a/tools/client-test.c b/tools/client-test.c index 8a0e9f4..1d8b0ec 100644 --- a/tools/client-test.c +++ b/tools/client-test.c @@ -36,7 +36,7 @@ static GOptionEntry entries[] = { }; static void -connect_done (GObject *source, GAsyncResult *result, gpointer user_data); +connect_done (GObject * source, GAsyncResult * result, gpointer user_data); int main (int argc, char *argv[]) @@ -62,8 +62,7 @@ main (int argc, char *argv[]) main_loop = g_main_loop_new (NULL, TRUE); - gst_rtmp_client_connect_async (client, cancellable, connect_done, - client); + gst_rtmp_client_connect_async (client, cancellable, connect_done, client); g_main_loop_run (main_loop); @@ -71,7 +70,7 @@ main (int argc, char *argv[]) } static void -connect_done (GObject *source, GAsyncResult *result, gpointer user_data) +connect_done (GObject * source, GAsyncResult * result, gpointer user_data) { GstRtmpClient *client = user_data; GError *error = NULL; @@ -79,10 +78,9 @@ connect_done (GObject *source, GAsyncResult *result, gpointer user_data) ret = gst_rtmp_client_connect_finish (client, result, &error); if (!ret) { - GST_ERROR("error: %s", error->message); + GST_ERROR ("error: %s", error->message); g_error_free (error); } - GST_ERROR("got here"); + GST_ERROR ("got here"); } - diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 3920d5f..37629af 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -28,6 +28,15 @@ #define GETTEXT_PACKAGE NULL +static void +add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection, + gpointer user_data); +static void +got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk, + gpointer user_data); +static void dump_data (GBytes * bytes); + + gboolean verbose; static GOptionEntry entries[] = { @@ -54,6 +63,8 @@ main (int argc, char *argv[]) g_option_context_free (context); server = gst_rtmp_server_new (); + g_signal_connect (server, "add-connection", G_CALLBACK (add_connection), + NULL); gst_rtmp_server_start (server); main_loop = g_main_loop_new (NULL, TRUE); @@ -62,3 +73,49 @@ main (int argc, char *argv[]) exit (0); } +static void +add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection, + gpointer user_data) +{ + GST_ERROR ("new connection"); + + g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); +} + +static void +got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk, + gpointer user_data) +{ + GBytes *bytes; + + GST_ERROR ("got chunk"); + + bytes = gst_rtmp_chunk_get_payload (chunk); + dump_data (bytes); +} + +static void +dump_data (GBytes * bytes) +{ + const guint8 *data; + gsize size; + int i, j; + + data = g_bytes_get_data (bytes, &size); + for (i = 0; i < size; i += 16) { + g_print ("%04x: ", i); + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%02x ", data[i + j]); + } else { + g_print (" "); + } + } + for (j = 0; j < 16; j++) { + if (i + j < size) { + g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.'); + } + } + g_print ("\n"); + } +} |