summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-25 10:46:22 -0700
committerDavid Schleef <ds@schleef.org>2014-08-25 10:46:22 -0700
commit2525802c534bb87145088a5c69a6df2ff3f57542 (patch)
tree04edb707b9add478ba9f3536b77109ae902c50c1
parent3574a53177db9e165a4a952020228703231d910d (diff)
hacking
-rw-r--r--rtmp/rtmpchunk.c185
-rw-r--r--rtmp/rtmpchunk.h26
-rw-r--r--rtmp/rtmpconnection.c93
-rw-r--r--rtmp/rtmpconnection.h3
-rw-r--r--rtmp/rtmpserver.c2
-rw-r--r--tools/proxy-server.c32
6 files changed, 268 insertions, 73 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index 2a8fe7b..8f27e37 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -129,7 +129,8 @@ gst_rtmp_chunk_new (void)
}
static GstRtmpChunkParseStatus
-chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
+chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes,
+ GstRtmpChunkCache * cache)
{
int offset;
const guint8 *data;
@@ -182,69 +183,91 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes)
chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) |
(data[offset + 2] << 8) | data[offset + 3];
offset += 4;
- } else if (header_fmt == 1) {
- if (size < offset + 7)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- GST_ERROR ("unimplemented: need previous chunk");
- chunk->timestamp =
- (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
- offset += 3;
- chunk->message_length =
- (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
- offset += 3;
- } else if (header_fmt == 2) {
- if (size < offset + 3)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->timestamp = 0;
- GST_ERROR ("unimplemented: need previous chunk");
- return GST_RTMP_CHUNK_PARSE_ERROR;
} else {
- chunk->timestamp = 0;
- GST_ERROR ("unimplemented: need previous chunk");
- return GST_RTMP_CHUNK_PARSE_ERROR;
- }
+ GstRtmpChunkCacheEntry *cached;
+
+ cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
+ if (cached) {
+ chunk->timestamp = cached->timestamp;
+ chunk->message_length = cached->message_length;
+ chunk->message_type_id = cached->message_type_id;
+ chunk->info = cached->info;
+ } else {
+ GST_ERROR ("uncached chunk, stream_id = %d", chunk->stream_id);
+ }
- if (needed_bytes)
- *needed_bytes = offset + chunk->message_length;
- if (size < offset + chunk->message_length)
- return GST_RTMP_CHUNK_PARSE_NEED_BYTES;
+ if (header_fmt == 1) {
+ if (size < offset + 7)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->timestamp +=
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ chunk->message_length =
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ chunk->message_type_id = data[offset];
+ offset += 1;
+ } else if (header_fmt == 2) {
+ if (size < offset + 3)
+ return GST_RTMP_CHUNK_PARSE_UNKNOWN;
+ chunk->timestamp +=
+ (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
+ offset += 3;
+ } else {
+ /* ok */
+ }
+ }
-#if 0
- {
+ /* WTF */
+ if ((header_fmt < 2) &&
+ (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09)) {
+ chunk->message_length += (chunk->message_length >> 7);
+ }
+#if 1
+ if (1) {
GBytes *b;
GST_ERROR ("PARSED CHUNK:");
- b = g_bytes_new_from_bytes (bytes, 0, offset + chunk->message_length);
+ b = g_bytes_new_from_bytes (bytes, 0, size);
gst_rtmp_dump_data (b);
g_bytes_unref (b);
+ //*(int *)0 = 1;
}
#endif
+ if (needed_bytes)
+ *needed_bytes = offset + chunk->message_length;
+ if (size < offset + chunk->message_length)
+ return GST_RTMP_CHUNK_PARSE_NEED_BYTES;
+
chunk->payload =
g_bytes_new_from_bytes (bytes, offset, chunk->message_length);
return GST_RTMP_CHUNK_PARSE_OK;
}
GstRtmpChunkParseStatus
-gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size)
+gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size,
+ GstRtmpChunkCache * cache)
{
GstRtmpChunk *chunk;
GstRtmpChunkParseStatus status;
chunk = gst_rtmp_chunk_new ();
- status = chunk_parse (chunk, bytes, chunk_size);
+ status = chunk_parse (chunk, bytes, chunk_size, cache);
g_object_unref (chunk);
return status;
}
GstRtmpChunk *
-gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size)
+gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size,
+ GstRtmpChunkCache * cache)
{
GstRtmpChunk *chunk;
GstRtmpChunkParseStatus status;
chunk = gst_rtmp_chunk_new ();
- status = chunk_parse (chunk, bytes, chunk_size);
+ status = chunk_parse (chunk, bytes, chunk_size, cache);
+ GST_ERROR ("status %d", status);
if (status == GST_RTMP_CHUNK_PARSE_OK)
return chunk;
@@ -253,34 +276,56 @@ gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size)
}
GBytes *
-gst_rtmp_chunk_serialize (GstRtmpChunk * chunk)
+gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache)
{
guint8 *data;
const guint8 *chunkdata;
gsize chunksize;
int header_fmt;
+ GstRtmpChunkCacheEntry *cached;
+ guint32 timestamp;
+ int offset;
/* FIXME this is incomplete and inefficient */
chunkdata = g_bytes_get_data (chunk->payload, &chunksize);
data = g_malloc (chunksize + 12);
+
header_fmt = 0;
+ cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
+ if (cached) {
+ header_fmt = 1;
+ timestamp = chunk->timestamp - cached->timestamp;
+ }
+
g_assert (chunk->stream_id < 64);
data[0] = (header_fmt << 6) | (chunk->stream_id);
- g_assert (chunk->timestamp < 0xffffff);
- data[1] = (chunk->timestamp >> 16) & 0xff;
- data[2] = (chunk->timestamp >> 8) & 0xff;
- data[3] = chunk->timestamp & 0xff;
- data[4] = (chunk->message_length >> 16) & 0xff;
- data[5] = (chunk->message_length >> 8) & 0xff;
- data[6] = chunk->message_length & 0xff;
- data[7] = chunk->message_type_id;
- data[8] = 0;
- data[9] = 0;
- data[10] = 0;
- data[11] = 0;
- memcpy (data + 12, chunkdata, chunksize);
-
- return g_bytes_new_take (data, chunksize + 12);
+ if (header_fmt == 0) {
+ g_assert (chunk->timestamp < 0xffffff);
+ data[1] = (chunk->timestamp >> 16) & 0xff;
+ data[2] = (chunk->timestamp >> 8) & 0xff;
+ data[3] = chunk->timestamp & 0xff;
+ data[4] = (chunk->message_length >> 16) & 0xff;
+ data[5] = (chunk->message_length >> 8) & 0xff;
+ data[6] = chunk->message_length & 0xff;
+ data[7] = chunk->message_type_id;
+ data[8] = (chunk->info >> 24) & 0xff;
+ data[9] = (chunk->info >> 16) & 0xff;
+ data[10] = (chunk->info >> 8) & 0xff;
+ data[11] = chunk->info & 0xff;
+ offset = 12;
+ } else {
+ data[1] = (timestamp >> 16) & 0xff;
+ data[2] = (timestamp >> 8) & 0xff;
+ data[3] = timestamp & 0xff;
+ data[4] = (chunk->message_length >> 16) & 0xff;
+ data[5] = (chunk->message_length >> 8) & 0xff;
+ data[6] = chunk->message_length & 0xff;
+ data[7] = chunk->message_type_id;
+ offset = 8;
+ }
+ memcpy (data + offset, chunkdata, chunksize);
+
+ return g_bytes_new_take (data, chunksize + offset);
}
void
@@ -321,3 +366,47 @@ gst_rtmp_chunk_get_payload (GstRtmpChunk * chunk)
{
return chunk->payload;
}
+
+/* chunk cache */
+
+GstRtmpChunkCache *
+gst_rtmp_chunk_cache_new (void)
+{
+ return g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkCacheEntry));
+}
+
+void
+gst_rtmp_chunk_cache_free (GstRtmpChunkCache * cache)
+{
+ g_array_free (cache, TRUE);
+
+}
+
+GstRtmpChunkCacheEntry *
+gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id)
+{
+ int i;
+ GstRtmpChunkCacheEntry *entry;
+ for (i = 0; i < cache->len; i++) {
+ entry = &g_array_index (cache, GstRtmpChunkCacheEntry, i);
+ if (entry->stream_id == stream_id)
+ return entry;
+ }
+ return NULL;
+}
+
+void
+gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk)
+{
+ GstRtmpChunkCacheEntry *entry;
+ entry = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
+ if (entry == NULL) {
+ g_array_set_size (cache, cache->len + 1);
+ entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1);
+ entry->stream_id = chunk->stream_id;
+ }
+ entry->timestamp = chunk->timestamp;
+ entry->message_length = chunk->message_length;
+ entry->message_type_id = chunk->message_type_id;
+ entry->info = chunk->info;
+}
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 834e217..267e871 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -32,6 +32,15 @@ G_BEGIN_DECLS
typedef struct _GstRtmpChunk GstRtmpChunk;
typedef struct _GstRtmpChunkClass GstRtmpChunkClass;
+typedef GArray GstRtmpChunkCache;
+typedef struct _GstRtmpChunkCacheEntry GstRtmpChunkCacheEntry;
+struct _GstRtmpChunkCacheEntry {
+ guint32 stream_id;
+ guint32 timestamp;
+ gsize message_length;
+ int message_type_id;
+ guint32 info;
+};
struct _GstRtmpChunk
{
@@ -63,9 +72,11 @@ GType gst_rtmp_chunk_get_type (void);
GstRtmpChunk *gst_rtmp_chunk_new (void);
GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes,
- gsize *chunk_size);
-GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size);
-GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk);
+ gsize *chunk_size, GstRtmpChunkCache *cache);
+GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size,
+ GstRtmpChunkCache *cache);
+GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk,
+ GstRtmpChunkCache *cache);
void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id);
void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp);
@@ -75,6 +86,15 @@ guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk);
guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk);
GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk);
+/* chunk cache */
+
+GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void);
+void gst_rtmp_chunk_cache_free (GstRtmpChunkCache *cache);
+GstRtmpChunkCacheEntry * gst_rtmp_chunk_cache_get (
+ GstRtmpChunkCache *cache, int stream_id);
+void gst_rtmp_chunk_cache_update (GstRtmpChunkCache *cache,
+ GstRtmpChunk *chunk);
+
G_END_DECLS
#endif
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 625f721..6e37caa 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -104,6 +104,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue = g_queue_new ();
+ rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new ();
+ rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new ();
}
void
@@ -156,6 +158,8 @@ gst_rtmp_connection_finalize (GObject * object)
GST_DEBUG_OBJECT (rtmpconnection, "finalize");
/* clean up object here */
+ gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache);
+ gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache);
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
}
@@ -177,7 +181,6 @@ gst_rtmp_connection_new (GSocketConnection * connection)
(GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL);
g_source_attach (sc->input_source, NULL);
-
return sc;
}
@@ -209,15 +212,22 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
GST_DEBUG ("input ready");
- data = g_malloc (128);
+ data = g_malloc (4096);
ret =
g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
- data, 128, sc->cancellable, &error);
+ data, 4096, sc->cancellable, &error);
if (ret < 0) {
GST_ERROR ("read error: %s", error->message);
g_error_free (error);
return G_SOURCE_REMOVE;
}
+ if (ret == 0) {
+ /* FIXME probably closed */
+ GST_ERROR ("closed?");
+ return G_SOURCE_REMOVE;
+ }
+
+ GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret);
if (sc->input_bytes) {
sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret);
@@ -225,15 +235,18 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
sc->input_bytes = g_bytes_new_take (data, ret);
}
- if (sc->input_callback) {
- if (g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) {
- GstRtmpConnectionCallback callback;
- GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
- g_bytes_get_size (sc->input_bytes));
- callback = sc->input_callback;
- sc->input_callback = NULL;
- (*callback) (sc);
- }
+ //GST_ERROR("ic: %p", sc->input_callback);
+ //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes));
+ GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes);
+
+ while (sc->input_callback && sc->input_bytes &&
+ g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) {
+ GstRtmpConnectionCallback callback;
+ GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ g_bytes_get_size (sc->input_bytes));
+ callback = sc->input_callback;
+ sc->input_callback = NULL;
+ (*callback) (sc);
}
return G_SOURCE_CONTINUE;
@@ -246,10 +259,10 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GstRtmpChunk *chunk;
GBytes *bytes;
- GST_ERROR ("output ready");
+ GST_DEBUG ("output ready");
if (sc->writing) {
- GST_ERROR ("busy writing");
+ GST_DEBUG ("busy writing");
return G_SOURCE_REMOVE;
}
@@ -257,11 +270,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
if (!chunk) {
return G_SOURCE_REMOVE;
}
+ GST_ERROR ("popped");
sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- bytes = gst_rtmp_chunk_serialize (chunk);
- //gst_rtmp_dump_data (bytes);
+ bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache);
+ gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk);
+ gst_rtmp_dump_data (bytes);
g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
@@ -407,6 +422,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
GST_ERROR ("server handshake finished");
sc->handshake_complete = TRUE;
+ if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
+ GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ g_bytes_get_size (sc->input_bytes));
+ gst_rtmp_connection_chunk_callback (sc);
+ }
+
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
@@ -417,7 +438,7 @@ static void
gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
{
GstRtmpChunk *chunk;
- gsize size;
+ gsize size = 0;
while (1) {
GBytes *bytes;
@@ -425,17 +446,23 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
- chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size);
+ chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size,
+ sc->input_chunk_cache);
if (chunk == NULL)
break;
+ gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk);
+
bytes = gst_rtmp_connection_take_input_bytes (sc, size);
g_bytes_unref (bytes);
GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length);
g_signal_emit_by_name (sc, "got-chunk", chunk);
g_object_unref (chunk);
+
+ size = 0;
}
+ GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size);
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, size);
}
@@ -447,6 +474,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_RTMP_CHUNK (chunk));
+ GST_ERROR ("queued");
chunk->priv = connection;
g_queue_push_tail (connection->output_queue, chunk);
gst_rtmp_connection_start_output (connection);
@@ -464,6 +492,18 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
if (needed_bytes == 0) {
connection->input_needed_bytes = 1;
}
+
+ if (connection->input_callback && connection->input_bytes &&
+ g_bytes_get_size (connection->input_bytes) >=
+ connection->input_needed_bytes) {
+ GstRtmpConnectionCallback callback;
+ GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ g_bytes_get_size (connection->input_bytes));
+ callback = connection->input_callback;
+ connection->input_callback = NULL;
+ (*callback) (connection);
+ }
+
}
void
@@ -581,7 +621,24 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
GST_ERROR ("client handshake finished");
sc->handshake_complete = TRUE;
+ if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) {
+ GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
+ g_bytes_get_size (sc->input_bytes));
+ gst_rtmp_connection_chunk_callback (sc);
+ }
+
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, 0);
gst_rtmp_connection_start_output (sc);
}
+
+void
+gst_rtmp_connection_dump (GstRtmpConnection * connection)
+{
+ g_print (" output_queue: %d\n",
+ g_queue_get_length (connection->output_queue));
+ g_print (" input_bytes: %" G_GSIZE_FORMAT "\n",
+ connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0);
+ g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes);
+
+}
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index 21a057e..6823d81 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -57,6 +57,8 @@ struct _GstRtmpConnection
gsize input_needed_bytes;
GstRtmpConnectionCallback input_callback;
gboolean handshake_complete;
+ GstRtmpChunkCache *input_chunk_cache;
+ GstRtmpChunkCache *output_chunk_cache;
};
struct _GstRtmpConnectionClass
@@ -76,6 +78,7 @@ void gst_rtmp_connection_start_handshake (GstRtmpConnection *connection,
gboolean is_server);
void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection,
GstRtmpChunk *chunk);
+void gst_rtmp_connection_dump (GstRtmpConnection *connection);
G_END_DECLS
diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c
index 96f281d..7bb2854 100644
--- a/rtmp/rtmpserver.c
+++ b/rtmp/rtmpserver.c
@@ -74,7 +74,7 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass)
static void
gst_rtmp_server_init (GstRtmpServer * rtmpserver)
{
- rtmpserver->port = 11935;
+ rtmpserver->port = 1935;
}
void
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 11276dd..0a953fd 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -41,10 +41,12 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data);
static void
got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data);
+static gboolean periodic (gpointer user_data);
GstRtmpServer *server;
GstRtmpClient *client;
GstRtmpConnection *client_connection;
+GstRtmpConnection *server_connection;
GCancellable *cancellable;
GstRtmpChunk *proxy_chunk;
@@ -80,6 +82,7 @@ main (int argc, char *argv[])
client = gst_rtmp_client_new ();
cancellable = g_cancellable_new ();
+ g_timeout_add (1000, periodic, NULL);
main_loop = g_main_loop_new (NULL, TRUE);
g_main_loop_run (main_loop);
@@ -110,13 +113,14 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk,
GST_INFO ("got chunk");
bytes = gst_rtmp_chunk_get_payload (chunk);
- gst_rtmp_dump_data (bytes);
+ if (0)
+ gst_rtmp_dump_data (bytes);
proxy_conn = gst_rtmp_client_get_connection (client);
g_object_ref (chunk);
if (proxy_conn) {
- GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, chunk->message_length);
+ GST_ERROR (">>>: %" G_GSIZE_FORMAT, chunk->message_length);
gst_rtmp_connection_queue_chunk (proxy_conn, chunk);
} else {
GST_ERROR ("saving first chunk");
@@ -144,6 +148,7 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data)
GstRtmpConnection *proxy_conn;
proxy_conn = gst_rtmp_client_get_connection (client);
+ server_connection = proxy_conn;
GST_ERROR ("sending to server: %" G_GSIZE_FORMAT,
proxy_chunk->message_length);
gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk);
@@ -159,9 +164,30 @@ static void
got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk,
gpointer user_data)
{
+ GBytes *bytes;
+
GST_INFO ("got chunk");
g_object_ref (chunk);
- GST_ERROR ("sending to client: %" G_GSIZE_FORMAT, chunk->message_length);
+ GST_ERROR ("<<<: %" G_GSIZE_FORMAT, chunk->message_length);
+
+ bytes = gst_rtmp_chunk_get_payload (chunk);
+ gst_rtmp_dump_data (bytes);
+
gst_rtmp_connection_queue_chunk (client_connection, chunk);
}
+
+static gboolean
+periodic (gpointer user_data)
+{
+ g_print (".\n");
+ if (client_connection) {
+ g_print ("CLIENT:\n");
+ gst_rtmp_connection_dump (client_connection);
+ }
+ if (server_connection) {
+ g_print ("SERVER:\n");
+ gst_rtmp_connection_dump (server_connection);
+ }
+ return G_SOURCE_CONTINUE;
+}