diff options
author | David Schleef <ds@schleef.org> | 2014-08-24 22:18:03 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-24 22:18:03 -0700 |
commit | 3574a53177db9e165a4a952020228703231d910d (patch) | |
tree | e1dc7c3ed8217800c41e182ad756c2480a2b967b | |
parent | 2f90cf720e4cf7e27a375dbf153614eb9242bf24 (diff) |
hacking
-rw-r--r-- | rtmp/rtmpchunk.c | 25 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 1 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 4 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 50 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 1 | ||||
-rw-r--r-- | tools/proxy-server.c | 26 |
6 files changed, 81 insertions, 26 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index 32d9b1e..2a8fe7b 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -23,6 +23,7 @@ #include <gst/gst.h> #include "rtmpchunk.h" +#include "rtmputils.h" #include <string.h> GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category); @@ -142,8 +143,8 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) if (size < 1) return GST_RTMP_CHUNK_PARSE_UNKNOWN; - header_fmt = data[0] >> 6; - chunk->stream_id = data[0] & 0x3f; + header_fmt = data[0] >> 6; /* librtmp: m_headerType */ + chunk->stream_id = data[0] & 0x3f; /* librtmp: m_nChannel */ offset = 1; if (chunk->stream_id == 0) { if (size < 2) @@ -159,23 +160,27 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) if (header_fmt == 0) { if (size < offset + 11) return GST_RTMP_CHUNK_PARSE_UNKNOWN; + /* librtmp: m_nTimeStamp */ chunk->timestamp = (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; + /* librtmp: m_nBodySize */ chunk->message_length = (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; if (chunk->timestamp == 0xffffff) { - if (size < offset + 4) + if (size < offset + 4 + 1 + 4) return GST_RTMP_CHUNK_PARSE_UNKNOWN; chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; offset += 4; } - chunk->message_type_id = data[offset]; + chunk->message_type_id = data[offset]; /* librtmp: m_packetType */ offset += 1; - /* 4 byte something here */ + /* librtmp: m_nInfoField2 */ + chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) | + (data[offset + 2] << 8) | data[offset + 3]; offset += 4; } else if (header_fmt == 1) { if (size < offset + 7) @@ -204,6 +209,16 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) if (size < offset + chunk->message_length) return GST_RTMP_CHUNK_PARSE_NEED_BYTES; +#if 0 + { + GBytes *b; + GST_ERROR ("PARSED CHUNK:"); + b = g_bytes_new_from_bytes (bytes, 0, offset + chunk->message_length); + gst_rtmp_dump_data (b); + g_bytes_unref (b); + } +#endif + chunk->payload = g_bytes_new_from_bytes (bytes, offset, chunk->message_length); return GST_RTMP_CHUNK_PARSE_OK; diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 1165fb7..834e217 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -41,6 +41,7 @@ struct _GstRtmpChunk guint32 timestamp; gsize message_length; int message_type_id; + guint32 info; GBytes *payload; gpointer priv; diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 219feba..2a53187 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -177,7 +177,9 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, client->cancellable = cancellable; client->async = async; - addr = g_network_address_new ("localhost", 1935); + addr = + g_network_address_new + ("ec2-54-188-128-44.us-west-2.compute.amazonaws.com", 1935); client->socket_client = g_socket_client_new (); GST_DEBUG ("g_socket_client_connect_async"); diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 26f8538..625f721 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -66,6 +66,7 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection, void (*input_callback) (GstRtmpConnection * connection), gsize needed_bytes); static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc); +static void gst_rtmp_connection_start_output (GstRtmpConnection * sc); static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); @@ -164,7 +165,6 @@ gst_rtmp_connection_new (GSocketConnection * connection) { GstRtmpConnection *sc; GInputStream *is; - GOutputStream *os; sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); sc->connection = connection; @@ -177,14 +177,26 @@ gst_rtmp_connection_new (GSocketConnection * connection) (GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL); g_source_attach (sc->input_source, NULL); + + return sc; +} + +static void +gst_rtmp_connection_start_output (GstRtmpConnection * sc) +{ + GSource *source; + GOutputStream *os; + + if (!sc->handshake_complete) + return; + os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - sc->output_source = + source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os), sc->cancellable); - g_source_set_callback (sc->output_source, - (GSourceFunc) gst_rtmp_connection_output_ready, sc, NULL); - - return sc; + g_source_set_callback (source, (GSourceFunc) gst_rtmp_connection_output_ready, + sc, NULL); + g_source_attach (source, NULL); } static gboolean @@ -236,6 +248,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) GST_ERROR ("output ready"); + if (sc->writing) { + GST_ERROR ("busy writing"); + return G_SOURCE_REMOVE; + } + chunk = g_queue_pop_head (sc->output_queue); if (!chunk) { return G_SOURCE_REMOVE; @@ -244,10 +261,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) sc->writing = TRUE; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); bytes = gst_rtmp_chunk_serialize (chunk); - gst_rtmp_dump_data (bytes); + //gst_rtmp_dump_data (bytes); g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); + //return G_SOURCE_CONTINUE; return G_SOURCE_REMOVE; } @@ -274,9 +292,7 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret); g_object_unref (chunk); - if (g_source_get_context (connection->output_source) == NULL) { - g_source_attach (connection->output_source, NULL); - } + gst_rtmp_connection_start_output (connection); } @@ -389,10 +405,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc) /* handshake finished */ GST_ERROR ("server handshake finished"); + sc->handshake_complete = TRUE; gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); + gst_rtmp_connection_start_output (sc); } static void @@ -406,9 +424,6 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) if (sc->input_bytes == NULL) break; - GST_ERROR ("parsing %" G_GSIZE_FORMAT " bytes", - g_bytes_get_size (sc->input_bytes)); - gst_rtmp_dump_data (sc->input_bytes); chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size); if (chunk == NULL) @@ -434,9 +449,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, chunk->priv = connection; g_queue_push_tail (connection->output_queue, chunk); - if (connection->handshake_complete) { - g_source_attach (connection->output_source, NULL); - } + gst_rtmp_connection_start_output (connection); } static void @@ -566,10 +579,9 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, /* handshake finished */ GST_ERROR ("client handshake finished"); + sc->handshake_complete = TRUE; gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); - if (!g_queue_is_empty (sc->output_queue)) { - g_source_attach (sc->output_source, NULL); - } + gst_rtmp_connection_start_output (sc); } diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index 5fa78d7..21a057e 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -53,7 +53,6 @@ struct _GstRtmpConnection gboolean writing; GSource *input_source; - GSource *output_source; GBytes *input_bytes; gsize input_needed_bytes; GstRtmpConnectionCallback input_callback; diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 5abc013..11276dd 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -38,9 +38,13 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); static void connect_done (GObject * source, GAsyncResult * result, gpointer user_data); +static void +got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data); GstRtmpServer *server; GstRtmpClient *client; +GstRtmpConnection *client_connection; GCancellable *cancellable; GstRtmpChunk *proxy_chunk; @@ -92,6 +96,8 @@ add_connection (GstRtmpServer * server, GstRtmpConnection * connection, g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); gst_rtmp_client_connect_async (client, cancellable, connect_done, client); + + client_connection = connection; } static void @@ -110,8 +116,10 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, g_object_ref (chunk); if (proxy_conn) { + GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, chunk->message_length); gst_rtmp_connection_queue_chunk (proxy_conn, chunk); } else { + GST_ERROR ("saving first chunk"); /* save it for after the connection is complete */ proxy_chunk = chunk; } @@ -121,6 +129,7 @@ static void connect_done (GObject * source, GAsyncResult * result, gpointer user_data) { GstRtmpClient *client = user_data; + GstRtmpConnection *proxy_conn; GError *error = NULL; gboolean ret; @@ -135,7 +144,24 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data) GstRtmpConnection *proxy_conn; proxy_conn = gst_rtmp_client_get_connection (client); + GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, + proxy_chunk->message_length); gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk); proxy_chunk = NULL; } + + proxy_conn = gst_rtmp_client_get_connection (client); + g_signal_connect (proxy_conn, "got-chunk", G_CALLBACK (got_chunk_proxy), + NULL); +} + +static void +got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk, + gpointer user_data) +{ + GST_INFO ("got chunk"); + + g_object_ref (chunk); + GST_ERROR ("sending to client: %" G_GSIZE_FORMAT, chunk->message_length); + gst_rtmp_connection_queue_chunk (client_connection, chunk); } |