summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-24 22:18:03 -0700
committerDavid Schleef <ds@schleef.org>2014-08-24 22:18:03 -0700
commit3574a53177db9e165a4a952020228703231d910d (patch)
treee1dc7c3ed8217800c41e182ad756c2480a2b967b
parent2f90cf720e4cf7e27a375dbf153614eb9242bf24 (diff)
hacking
-rw-r--r--rtmp/rtmpchunk.c25
-rw-r--r--rtmp/rtmpchunk.h1
-rw-r--r--rtmp/rtmpclient.c4
-rw-r--r--rtmp/rtmpconnection.c50
-rw-r--r--rtmp/rtmpconnection.h1
-rw-r--r--tools/proxy-server.c26
6 files changed, 81 insertions, 26 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index 32d9b1e..2a8fe7b 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -23,6 +23,7 @@
#include <gst/gst.h>
#include "rtmpchunk.h"
+#include "rtmputils.h"
#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_debug_category);
@@ -142,8 +143,8 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
if (size < 1)
return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- header_fmt = data[0] >> 6;
- chunk->stream_id = data[0] & 0x3f;
+ header_fmt = data[0] >> 6; /* librtmp: m_headerType */
+ chunk->stream_id = data[0] & 0x3f; /* librtmp: m_nChannel */
offset = 1;
if (chunk->stream_id == 0) {
if (size < 2)
@@ -159,23 +160,27 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
if (header_fmt == 0) {
if (size < offset + 11)
return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ /* librtmp: m_nTimeStamp */
chunk->timestamp =
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
+ /* librtmp: m_nBodySize */
chunk->message_length =
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
if (chunk->timestamp == 0xffffff) {
- if (size < offset + 4)
+ if (size < offset + 4 + 1 + 4)
return GST_RTMP_CHUNK_PARSE_UNKNOWN;
chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) |
(data[offset + 2] << 8) | data[offset + 3];
offset += 4;
}
- chunk->message_type_id = data[offset];
+ chunk->message_type_id = data[offset]; /* librtmp: m_packetType */
offset += 1;
- /* 4 byte something here */
+ /* librtmp: m_nInfoField2 */
+ chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) |
+ (data[offset + 2] << 8) | data[offset + 3];
offset += 4;
} else if (header_fmt == 1) {
if (size < offset + 7)
@@ -204,6 +209,16 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
if (size < offset + chunk->message_length)
return GST_RTMP_CHUNK_PARSE_NEED_BYTES;
+#if 0
+ {
+ GBytes *b;
+ GST_ERROR ("PARSED CHUNK:");
+ b = g_bytes_new_from_bytes (bytes, 0, offset + chunk->message_length);
+ gst_rtmp_dump_data (b);
+ g_bytes_unref (b);
+ }
+#endif
+
chunk->payload =
g_bytes_new_from_bytes (bytes, offset, chunk->message_length);
return GST_RTMP_CHUNK_PARSE_OK;
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 1165fb7..834e217 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -41,6 +41,7 @@ struct _GstRtmpChunk
guint32 timestamp;
gsize message_length;
int message_type_id;
+ guint32 info;
GBytes *payload;
gpointer priv;
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 219feba..2a53187 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -177,7 +177,9 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
client->cancellable = cancellable;
client->async = async;
- addr = g_network_address_new ("localhost", 1935);
+ addr =
+ g_network_address_new
+ ("ec2-54-188-128-44.us-west-2.compute.amazonaws.com", 1935);
client->socket_client = g_socket_client_new ();
GST_DEBUG ("g_socket_client_connect_async");
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 26f8538..625f721 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -66,6 +66,7 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
void (*input_callback) (GstRtmpConnection * connection),
gsize needed_bytes);
static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc);
+static void gst_rtmp_connection_start_output (GstRtmpConnection * sc);
static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
@@ -164,7 +165,6 @@ gst_rtmp_connection_new (GSocketConnection * connection)
{
GstRtmpConnection *sc;
GInputStream *is;
- GOutputStream *os;
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
sc->connection = connection;
@@ -177,14 +177,26 @@ gst_rtmp_connection_new (GSocketConnection * connection)
(GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL);
g_source_attach (sc->input_source, NULL);
+
+ return sc;
+}
+
+static void
+gst_rtmp_connection_start_output (GstRtmpConnection * sc)
+{
+ GSource *source;
+ GOutputStream *os;
+
+ if (!sc->handshake_complete)
+ return;
+
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- sc->output_source =
+ source =
g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os),
sc->cancellable);
- g_source_set_callback (sc->output_source,
- (GSourceFunc) gst_rtmp_connection_output_ready, sc, NULL);
-
- return sc;
+ g_source_set_callback (source, (GSourceFunc) gst_rtmp_connection_output_ready,
+ sc, NULL);
+ g_source_attach (source, NULL);
}
static gboolean
@@ -236,6 +248,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GST_ERROR ("output ready");
+ if (sc->writing) {
+ GST_ERROR ("busy writing");
+ return G_SOURCE_REMOVE;
+ }
+
chunk = g_queue_pop_head (sc->output_queue);
if (!chunk) {
return G_SOURCE_REMOVE;
@@ -244,10 +261,11 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
bytes = gst_rtmp_chunk_serialize (chunk);
- gst_rtmp_dump_data (bytes);
+ //gst_rtmp_dump_data (bytes);
g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
+ //return G_SOURCE_CONTINUE;
return G_SOURCE_REMOVE;
}
@@ -274,9 +292,7 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
GST_ERROR ("write %" G_GSSIZE_FORMAT " bytes", ret);
g_object_unref (chunk);
- if (g_source_get_context (connection->output_source) == NULL) {
- g_source_attach (connection->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (connection);
}
@@ -389,10 +405,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
/* handshake finished */
GST_ERROR ("server handshake finished");
+ sc->handshake_complete = TRUE;
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
+ gst_rtmp_connection_start_output (sc);
}
static void
@@ -406,9 +424,6 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
- GST_ERROR ("parsing %" G_GSIZE_FORMAT " bytes",
- g_bytes_get_size (sc->input_bytes));
- gst_rtmp_dump_data (sc->input_bytes);
chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size);
if (chunk == NULL)
@@ -434,9 +449,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
- if (connection->handshake_complete) {
- g_source_attach (connection->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (connection);
}
static void
@@ -566,10 +579,9 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
/* handshake finished */
GST_ERROR ("client handshake finished");
+ sc->handshake_complete = TRUE;
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
- if (!g_queue_is_empty (sc->output_queue)) {
- g_source_attach (sc->output_source, NULL);
- }
+ gst_rtmp_connection_start_output (sc);
}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index 5fa78d7..21a057e 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -53,7 +53,6 @@ struct _GstRtmpConnection
gboolean writing;
GSource *input_source;
- GSource *output_source;
GBytes *input_bytes;
gsize input_needed_bytes;
GstRtmpConnectionCallback input_callback;
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 5abc013..11276dd 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -38,9 +38,13 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
static void
connect_done (GObject * source, GAsyncResult * result, gpointer user_data);
+static void
+got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data);
GstRtmpServer *server;
GstRtmpClient *client;
+GstRtmpConnection *client_connection;
GCancellable *cancellable;
GstRtmpChunk *proxy_chunk;
@@ -92,6 +96,8 @@ add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
gst_rtmp_client_connect_async (client, cancellable, connect_done, client);
+
+ client_connection = connection;
}
static void
@@ -110,8 +116,10 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
g_object_ref (chunk);
if (proxy_conn) {
+ GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, chunk->message_length);
gst_rtmp_connection_queue_chunk (proxy_conn, chunk);
} else {
+ GST_ERROR ("saving first chunk");
/* save it for after the connection is complete */
proxy_chunk = chunk;
}
@@ -121,6 +129,7 @@ static void
connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
GstRtmpClient *client = user_data;
+ GstRtmpConnection *proxy_conn;
GError *error = NULL;
gboolean ret;
@@ -135,7 +144,24 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
GstRtmpConnection *proxy_conn;
proxy_conn = gst_rtmp_client_get_connection (client);
+ GST_ERROR ("sending to server: %" G_GSIZE_FORMAT,
+ proxy_chunk->message_length);
gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk);
proxy_chunk = NULL;
}
+
+ proxy_conn = gst_rtmp_client_get_connection (client);
+ g_signal_connect (proxy_conn, "got-chunk", G_CALLBACK (got_chunk_proxy),
+ NULL);
+}
+
+static void
+got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk,
+ gpointer user_data)
+{
+ GST_INFO ("got chunk");
+
+ g_object_ref (chunk);
+ GST_ERROR ("sending to client: %" G_GSIZE_FORMAT, chunk->message_length);
+ gst_rtmp_connection_queue_chunk (client_connection, chunk);
}