summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-23 22:41:19 -0700
committerDavid Schleef <ds@schleef.org>2014-08-23 22:41:19 -0700
commit6b0cac8398d8a85c3686729cce1f8c1180a90673 (patch)
tree8d2e9967cc80d01552378e0c918aaf60721cd0cd
parent45301abe260ed6c517cfc31ff0d6f23a0f1d36d6 (diff)
hacking
-rw-r--r--rtmp/rtmpchunk.c155
-rw-r--r--rtmp/rtmpchunk.h30
-rw-r--r--rtmp/rtmpserver.c43
-rw-r--r--rtmp/rtmpserver.h13
-rw-r--r--rtmp/rtmpserverconnection.c112
-rw-r--r--rtmp/rtmpserverconnection.h5
-rw-r--r--tools/client-test.c12
-rw-r--r--tools/proxy-server.c57
8 files changed, 320 insertions, 107 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index de80211..de0f46b 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -119,3 +119,158 @@ gst_rtmp_chunk_finalize (GObject * object)
G_OBJECT_CLASS (gst_rtmp_chunk_parent_class)->finalize (object);
}
+
+GstRtmpChunk *
+gst_rtmp_chunk_new (void)
+{
+ return g_object_new (GST_TYPE_RTMP_CHUNK, NULL);
+}
+
+static GstRtmpChunkParseStatus
+chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
+{
+ int offset;
+ const guint8 *data;
+ gsize size;
+ int header_fmt;
+
+ if (*needed_bytes)
+ *needed_bytes = 0;
+
+ data = g_bytes_get_data (bytes, &size);
+ if (size < 1)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+
+ header_fmt = data[0] >> 6;
+ chunk->stream_id = data[0] & 0x3f;
+ offset = 1;
+ if (chunk->stream_id == 0) {
+ if (size < 2)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->stream_id = 64 + data[1];
+ offset = 2;
+ } else if (chunk->stream_id == 1) {
+ if (size < 3)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->stream_id = 64 + data[1] + (data[2] << 8);
+ offset = 3;
+ }
+ if (header_fmt == 0) {
+ if (size < offset + 11)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->timestamp =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ chunk->message_length =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ if (chunk->timestamp == 0xffffff) {
+ if (size < offset + 4)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) |
+ (data[offset + 2] << 8) | data[offset + 3];
+ offset += 4;
+ }
+ chunk->message_type_id = data[offset + 6];
+ offset += 1;
+
+ /* 4 byte something here */
+ offset += 4;
+ } else if (header_fmt == 1) {
+ if (size < offset + 7)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ GST_ERROR ("unimplemented: need previous chunk");
+ chunk->timestamp =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ chunk->message_length =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ } else if (header_fmt == 2) {
+ if (size < offset + 3)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->timestamp = 0;
+ GST_ERROR ("unimplemented: need previous chunk");
+ return GST_RTMP_CHUNK_PARSE_ERROR;
+ } else {
+ chunk->timestamp = 0;
+ GST_ERROR ("unimplemented: need previous chunk");
+ return GST_RTMP_CHUNK_PARSE_ERROR;
+ }
+
+ if (needed_bytes)
+ *needed_bytes = offset + chunk->message_length;
+ if (size < offset + chunk->message_length)
+ return GST_RTMP_CHUNK_PARSE_NEED_BYTES;
+
+ chunk->payload =
+ g_bytes_new_from_bytes (bytes, offset, chunk->message_length);
+ return GST_RTMP_CHUNK_PARSE_OK;
+}
+
+GstRtmpChunkParseStatus
+gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size)
+{
+ GstRtmpChunk *chunk;
+ GstRtmpChunkParseStatus status;
+
+ chunk = gst_rtmp_chunk_new ();
+ status = chunk_parse (chunk, bytes, chunk_size);
+ g_object_unref (chunk);
+
+ return status;
+}
+
+GstRtmpChunk *
+gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size)
+{
+ GstRtmpChunk *chunk;
+ GstRtmpChunkParseStatus status;
+
+ chunk = gst_rtmp_chunk_new ();
+ status = chunk_parse (chunk, bytes, chunk_size);
+ if (status == GST_RTMP_CHUNK_PARSE_OK)
+ return chunk;
+
+ g_object_unref (chunk);
+ return NULL;
+}
+
+void
+gst_rtmp_chunk_set_stream_id (GstRtmpChunk * chunk, guint32 stream_id)
+{
+ chunk->stream_id = stream_id;
+}
+
+void
+gst_rtmp_chunk_set_timestamp (GstRtmpChunk * chunk, guint32 timestamp)
+{
+ chunk->timestamp = timestamp;
+}
+
+void
+gst_rtmp_chunk_set_payload (GstRtmpChunk * chunk, GBytes * payload)
+{
+ if (chunk->payload) {
+ g_bytes_unref (chunk->payload);
+ }
+ chunk->payload = payload;
+}
+
+guint32
+gst_rtmp_chunk_get_stream_id (GstRtmpChunk * chunk)
+{
+ return chunk->stream_id;
+}
+
+guint32
+gst_rtmp_chunk_get_timestamp (GstRtmpChunk * chunk)
+{
+ return chunk->timestamp;
+}
+
+GBytes *
+gst_rtmp_chunk_get_payload (GstRtmpChunk * chunk)
+{
+ return chunk->payload;
+}
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 5e799ee..68edfcb 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -20,6 +20,7 @@
#ifndef _GST_RTMP_CHUNK_H_
#define _GST_RTMP_CHUNK_H_
+#include <glib.h>
G_BEGIN_DECLS
@@ -36,13 +37,12 @@ struct _GstRtmpChunk
{
GObject object;
- guint32 timestamp;
- int type_id;
- guint32 chunk_stream_id;
guint32 stream_id;
- int length;
- guint8 *data;
+ guint32 timestamp;
+ gsize message_length;
+ int message_type_id;
+ GBytes *payload;
};
struct _GstRtmpChunkClass
@@ -50,8 +50,28 @@ struct _GstRtmpChunkClass
GObjectClass object_class;
};
+typedef enum {
+ GST_RTMP_CHUNK_PARSE_ERROR = 0,
+ GST_RTMP_CHUNK_PARSE_OK,
+ GST_RTMP_CHUNK_PARSE_UNKNOWN,
+ GST_RTMP_CHUNK_PARSE_NEED_BYTES,
+} GstRtmpChunkParseStatus;
+
GType gst_rtmp_chunk_get_type (void);
+GstRtmpChunk *gst_rtmp_chunk_new (void);
+GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes,
+ gsize *chunk_size);
+GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size);
+
+void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id);
+void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp);
+void gst_rtmp_chunk_set_payload (GstRtmpChunk *chunk, GBytes *payload);
+
+guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk);
+guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk);
+GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk);
+
G_END_DECLS
#endif
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index 0b089b4..4996073 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -22,8 +22,7 @@
#endif
#include <gst/gst.h>
-#include "rtmpserver.h"
-#include "rtmpserverconnection.h"
+#include <rtmp/rtmpserver.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_server_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_server_debug_category
@@ -62,6 +61,14 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass)
gobject_class->dispose = gst_rtmp_server_dispose;
gobject_class->finalize = gst_rtmp_server_finalize;
+ g_signal_new ("add-connection", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass,
+ add_connection), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION);
+ g_signal_new ("remove-connection", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerClass,
+ remove_connection), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_SERVER_CONNECTION);
}
static void
@@ -108,6 +115,13 @@ gst_rtmp_server_dispose (GObject * object)
GST_DEBUG_OBJECT (rtmpserver, "dispose");
/* clean up as possible. may be called multiple times */
+ g_list_free_full (rtmpserver->connections, g_object_unref);
+ rtmpserver->connections = NULL;
+
+ if (rtmpserver->socket_service) {
+ g_object_unref (rtmpserver->socket_service);
+ rtmpserver->socket_service = NULL;
+ }
G_OBJECT_CLASS (gst_rtmp_server_parent_class)->dispose (object);
}
@@ -144,8 +158,8 @@ gst_rtmp_server_start (GstRtmpServer * rtmpserver)
rtmpserver->socket_service = g_socket_service_new ();
ret =
- g_socket_listener_add_inet_port (G_SOCKET_LISTENER (rtmpserver->
- socket_service), rtmpserver->port, NULL, &error);
+ g_socket_listener_add_inet_port (G_SOCKET_LISTENER
+ (rtmpserver->socket_service), rtmpserver->port, NULL, &error);
if (!ret) {
GST_ERROR ("failed to add address: %s", error->message);
g_object_unref (rtmpserver->socket_service);
@@ -168,8 +182,25 @@ gst_rtmp_server_incoming (GSocketService * service,
g_object_ref (connection);
server_connection = gst_rtmp_server_connection_new (connection);
- (void) server_connection;
- (void) rtmpserver;
+ gst_rtmp_server_add_connection (rtmpserver, server_connection);
return TRUE;
}
+
+void
+gst_rtmp_server_add_connection (GstRtmpServer * rtmpserver,
+ GstRtmpServerConnection * connection)
+{
+ rtmpserver->connections = g_list_prepend (rtmpserver->connections,
+ connection);
+ g_signal_emit_by_name (rtmpserver, "add-connection", connection);
+}
+
+void
+gst_rtmp_server_remove_connection (GstRtmpServer * rtmpserver,
+ GstRtmpServerConnection * connection)
+{
+ rtmpserver->connections = g_list_remove (rtmpserver->connections, connection);
+ g_signal_emit_by_name (rtmpserver, "remove-connection", connection);
+ g_object_unref (connection);
+}
diff --git a/rtmp/rtmpserver.h b/rtmp/rtmpserver.h
index e1e0466..75c71e3 100644
--- a/rtmp/rtmpserver.h
+++ b/rtmp/rtmpserver.h
@@ -22,6 +22,8 @@
#include <gio/gio.h>
+#include <rtmp/rtmpserverconnection.h>
+
G_BEGIN_DECLS
#define GST_TYPE_RTMP_SERVER (gst_rtmp_server_get_type())
@@ -42,18 +44,29 @@ struct _GstRtmpServer
/* private */
GSocketService *socket_service;
+ GList *connections;
};
struct _GstRtmpServerClass
{
GObjectClass object_class;
+
+ /* signals */
+ void (*add_connection) (GstRtmpServer *server,
+ GstRtmpServerConnection *connection);
+ void (*remove_connection) (GstRtmpServer *server,
+ GstRtmpServerConnection *connection);
};
GType gst_rtmp_server_get_type (void);
GstRtmpServer *gst_rtmp_server_new (void);
void gst_rtmp_server_start (GstRtmpServer * rtmpserver);
+void gst_rtmp_server_add_connection (GstRtmpServer *rtmpserver,
+ GstRtmpServerConnection *connection);
+void gst_rtmp_server_remove_connection (GstRtmpServer *rtmpserver,
+ GstRtmpServerConnection *connection);
G_END_DECLS
diff --git a/rtmp/rtmpserverconnection.c b/rtmp/rtmpserverconnection.c
index b3b08da..ef79d19 100644
--- a/rtmp/rtmpserverconnection.c
+++ b/rtmp/rtmpserverconnection.c
@@ -23,6 +23,7 @@
#include <gst/gst.h>
#include "rtmpserverconnection.h"
+#include "rtmpchunk.h"
#include "amf.h"
#include <string.h>
@@ -66,6 +67,10 @@ gst_rtmp_server_connection_class_init (GstRtmpServerConnectionClass * klass)
gobject_class->dispose = gst_rtmp_server_connection_dispose;
gobject_class->finalize = gst_rtmp_server_connection_finalize;
+ g_signal_new ("got-chunk", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtmpServerConnectionClass,
+ got_chunk), NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_RTMP_CHUNK);
}
static void
@@ -147,10 +152,6 @@ gst_rtmp_server_connection_new (GSocketConnection * connection)
return sc;
}
-static void
-gst_rtmp_server_connection_read_chunk_done (GObject * obj,
- GAsyncResult * res, gpointer user_data);
-
typedef struct _ChunkRead ChunkRead;
struct _ChunkRead
{
@@ -229,21 +230,15 @@ static void
gst_rtmp_server_connection_read_chunk (GstRtmpServerConnection * sc)
{
GInputStream *is;
- ChunkRead *chunk;
-
- chunk = g_malloc0 (sizeof (ChunkRead));
- chunk->alloc_size = 4096;
- chunk->data = g_malloc (chunk->alloc_size);
- chunk->server_connection = sc;
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
- g_input_stream_read_async (is, chunk->data, chunk->alloc_size,
+ g_input_stream_read_bytes_async (is, 4096,
G_PRIORITY_DEFAULT, sc->cancellable,
- gst_rtmp_server_connection_read_chunk_done, chunk);
+ gst_rtmp_server_connection_read_chunk_done, sc);
}
-static void
+G_GNUC_UNUSED static void
parse_message (guint8 * data, int size)
{
int offset;
@@ -275,99 +270,38 @@ parse_message (guint8 * data, int size)
}
-typedef struct _Chunk Chunk;
-struct _Chunk
-{
- int stream_id;
- int header_fmt;
- guint32 timestamp;
- int message_length;
- int message_type_id;
- guint32 message_stream_id;
-};
-
-static void
-parse_chunk (guint8 * data, int size)
-{
- Chunk _chunk, *chunk = &_chunk;
- int offset;
-
- chunk->header_fmt = data[0] >> 6;
- chunk->stream_id = data[0] & 0x3f;
- offset = 1;
- if (chunk->stream_id == 0) {
- chunk->stream_id = 64 + data[1];
- offset = 2;
- } else if (chunk->stream_id == 1) {
- chunk->stream_id = 64 + data[1] + (data[2] << 8);
- offset = 3;
- }
- chunk->timestamp =
- (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
- chunk->message_length =
- (data[offset + 3] << 16) | (data[offset + 4] << 8) | data[offset + 5];
- chunk->message_type_id = data[offset + 6];
- offset += 7;
-
- g_print ("header_fmt: %d\n", chunk->header_fmt);
- g_print ("stream_id: %d\n", chunk->stream_id);
- g_print ("timestamp: %d\n", chunk->timestamp);
- g_print ("message_length: %d\n", chunk->message_length);
- g_print ("message_type_id: %d\n", chunk->message_type_id);
-
- parse_message (data + offset, size - offset);
-}
-
-static void
-dump_data (guint8 * data, int size)
-{
- int i, j;
- for (i = 0; i < size; i += 16) {
- g_print ("%04x: ", i);
- for (j = 0; j < 16; j++) {
- if (i + j < size) {
- g_print ("%02x ", data[i + j]);
- } else {
- g_print (" ");
- }
- }
- for (j = 0; j < 16; j++) {
- if (i + j < size) {
- g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.');
- }
- }
- g_print ("\n");
- }
-}
-
static void
gst_rtmp_server_connection_read_chunk_done (GObject * obj,
GAsyncResult * res, gpointer user_data)
{
GInputStream *is = G_INPUT_STREAM (obj);
- ChunkRead *chunk = (ChunkRead *) user_data;
+ GstRtmpServerConnection *server_connection =
+ GST_RTMP_SERVER_CONNECTION (user_data);
+ GstRtmpChunk *chunk;
GError *error = NULL;
- gssize ret;
+ gsize chunk_size;
+ GBytes *bytes;
GST_ERROR ("gst_rtmp_server_connection_read_chunk_done");
- ret = g_input_stream_read_finish (is, res, &error);
- if (ret < 0) {
+ bytes = g_input_stream_read_bytes_finish (is, res, &error);
+ if (bytes == NULL) {
GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return;
}
- GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", ret);
- chunk->size = ret;
+ GST_ERROR ("read %" G_GSSIZE_FORMAT " bytes", g_bytes_get_size (bytes));
+
+ chunk = gst_rtmp_chunk_new_parse (bytes, &chunk_size);
- parse_chunk (chunk->data, chunk->size);
- dump_data (chunk->data, chunk->size);
+ if (chunk) {
+ g_signal_emit_by_name (server_connection, "got-chunk", chunk);
- if (0)
- proxy_write_chunk (chunk->server_connection, chunk);
+ g_object_unref (chunk);
+ }
}
-static void
+G_GNUC_UNUSED static void
proxy_write_chunk (GstRtmpServerConnection * sc, ChunkRead * chunk)
{
GOutputStream *os;
diff --git a/rtmp/rtmpserverconnection.h b/rtmp/rtmpserverconnection.h
index ab1f53c..19f06ab 100644
--- a/rtmp/rtmpserverconnection.h
+++ b/rtmp/rtmpserverconnection.h
@@ -21,6 +21,7 @@
#define _GST_RTMP_SERVER_CONNECTION_H_
#include <gio/gio.h>
+#include <rtmp/rtmpchunk.h>
G_BEGIN_DECLS
@@ -48,6 +49,10 @@ struct _GstRtmpServerConnection
struct _GstRtmpServerConnectionClass
{
GObjectClass object_class;
+
+ /* signals */
+ void (*got_chunk) (GstRtmpServerConnection *connection,
+ GstRtmpChunk *chunk);
};
GType gst_rtmp_server_connection_get_type (void);
diff --git a/tools/client-test.c b/tools/client-test.c
index 8a0e9f4..1d8b0ec 100644
--- a/tools/client-test.c
+++ b/tools/client-test.c
@@ -36,7 +36,7 @@ static GOptionEntry entries[] = {
};
static void
-connect_done (GObject *source, GAsyncResult *result, gpointer user_data);
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data);
int
main (int argc, char *argv[])
@@ -62,8 +62,7 @@ main (int argc, char *argv[])
main_loop = g_main_loop_new (NULL, TRUE);
- gst_rtmp_client_connect_async (client, cancellable, connect_done,
- client);
+ gst_rtmp_client_connect_async (client, cancellable, connect_done, client);
g_main_loop_run (main_loop);
@@ -71,7 +70,7 @@ main (int argc, char *argv[])
}
static void
-connect_done (GObject *source, GAsyncResult *result, gpointer user_data)
+connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
GstRtmpClient *client = user_data;
GError *error = NULL;
@@ -79,10 +78,9 @@ connect_done (GObject *source, GAsyncResult *result, gpointer user_data)
ret = gst_rtmp_client_connect_finish (client, result, &error);
if (!ret) {
- GST_ERROR("error: %s", error->message);
+ GST_ERROR ("error: %s", error->message);
g_error_free (error);
}
- GST_ERROR("got here");
+ GST_ERROR ("got here");
}
-
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 3920d5f..37629af 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -28,6 +28,15 @@
#define GETTEXT_PACKAGE NULL
+static void
+add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection,
+ gpointer user_data);
+static void
+got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data);
+static void dump_data (GBytes * bytes);
+
+
gboolean verbose;
static GOptionEntry entries[] = {
@@ -54,6 +63,8 @@ main (int argc, char *argv[])
g_option_context_free (context);
server = gst_rtmp_server_new ();
+ g_signal_connect (server, "add-connection", G_CALLBACK (add_connection),
+ NULL);
gst_rtmp_server_start (server);
main_loop = g_main_loop_new (NULL, TRUE);
@@ -62,3 +73,49 @@ main (int argc, char *argv[])
exit (0);
}
+static void
+add_connection (GstRtmpServer * server, GstRtmpServerConnection * connection,
+ gpointer user_data)
+{
+ GST_ERROR ("new connection");
+
+ g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
+}
+
+static void
+got_chunk (GstRtmpServerConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data)
+{
+ GBytes *bytes;
+
+ GST_ERROR ("got chunk");
+
+ bytes = gst_rtmp_chunk_get_payload (chunk);
+ dump_data (bytes);
+}
+
+static void
+dump_data (GBytes * bytes)
+{
+ const guint8 *data;
+ gsize size;
+ int i, j;
+
+ data = g_bytes_get_data (bytes, &size);
+ for (i = 0; i < size; i += 16) {
+ g_print ("%04x: ", i);
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%02x ", data[i + j]);
+ } else {
+ g_print (" ");
+ }
+ }
+ for (j = 0; j < 16; j++) {
+ if (i + j < size) {
+ g_print ("%c", g_ascii_isprint (data[i + j]) ? data[i + j] : '.');
+ }
+ }
+ g_print ("\n");
+ }
+}