diff options
author | David Schleef <ds@schleef.org> | 2014-08-25 10:46:22 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-25 10:46:22 -0700 |
commit | 2525802c534bb87145088a5c69a6df2ff3f57542 (patch) | |
tree | 04edb707b9add478ba9f3536b77109ae902c50c1 | |
parent | 3574a53177db9e165a4a952020228703231d910d (diff) |
hacking
-rw-r--r-- | rtmp/rtmpchunk.c | 185 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 26 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 93 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 3 | ||||
-rw-r--r-- | rtmp/rtmpserver.c | 2 | ||||
-rw-r--r-- | tools/proxy-server.c | 32 |
6 files changed, 268 insertions, 73 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index 2a8fe7b..8f27e37 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -129,7 +129,8 @@ gst_rtmp_chunk_new (void) } static GstRtmpChunkParseStatus -chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) +chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes, + GstRtmpChunkCache * cache) { int offset; const guint8 *data; @@ -182,69 +183,91 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes) chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; offset += 4; - } else if (header_fmt == 1) { - if (size < offset + 7) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - GST_ERROR ("unimplemented: need previous chunk"); - chunk->timestamp = - (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; - offset += 3; - chunk->message_length = - (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; - offset += 3; - } else if (header_fmt == 2) { - if (size < offset + 3) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->timestamp = 0; - GST_ERROR ("unimplemented: need previous chunk"); - return GST_RTMP_CHUNK_PARSE_ERROR; } else { - chunk->timestamp = 0; - GST_ERROR ("unimplemented: need previous chunk"); - return GST_RTMP_CHUNK_PARSE_ERROR; - } + GstRtmpChunkCacheEntry *cached; + + cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); + if (cached) { + chunk->timestamp = cached->timestamp; + chunk->message_length = cached->message_length; + chunk->message_type_id = cached->message_type_id; + chunk->info = cached->info; + } else { + GST_ERROR ("uncached chunk, stream_id = %d", chunk->stream_id); + } - if (needed_bytes) - *needed_bytes = offset + chunk->message_length; - if (size < offset + chunk->message_length) - return GST_RTMP_CHUNK_PARSE_NEED_BYTES; + if (header_fmt == 1) { + if (size < offset + 7) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->timestamp += + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + chunk->message_length = + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + chunk->message_type_id = data[offset]; + offset += 1; + } else if (header_fmt == 2) { + if (size < offset + 3) + return GST_RTMP_CHUNK_PARSE_UNKNOWN; + chunk->timestamp += + (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; + offset += 3; + } else { + /* ok */ + } + } -#if 0 - { + /* WTF */ + if ((header_fmt < 2) && + (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09)) { + chunk->message_length += (chunk->message_length >> 7); + } +#if 1 + if (1) { GBytes *b; GST_ERROR ("PARSED CHUNK:"); - b = g_bytes_new_from_bytes (bytes, 0, offset + chunk->message_length); + b = g_bytes_new_from_bytes (bytes, 0, size); gst_rtmp_dump_data (b); g_bytes_unref (b); + //*(int *)0 = 1; } #endif + if (needed_bytes) + *needed_bytes = offset + chunk->message_length; + if (size < offset + chunk->message_length) + return GST_RTMP_CHUNK_PARSE_NEED_BYTES; + chunk->payload = g_bytes_new_from_bytes (bytes, offset, chunk->message_length); return GST_RTMP_CHUNK_PARSE_OK; } GstRtmpChunkParseStatus -gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size) +gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size, + GstRtmpChunkCache * cache) { GstRtmpChunk *chunk; GstRtmpChunkParseStatus status; chunk = gst_rtmp_chunk_new (); - status = chunk_parse (chunk, bytes, chunk_size); + status = chunk_parse (chunk, bytes, chunk_size, cache); g_object_unref (chunk); return status; } GstRtmpChunk * -gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size) +gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size, + GstRtmpChunkCache * cache) { GstRtmpChunk *chunk; GstRtmpChunkParseStatus status; chunk = gst_rtmp_chunk_new (); - status = chunk_parse (chunk, bytes, chunk_size); + status = chunk_parse (chunk, bytes, chunk_size, cache); + GST_ERROR ("status %d", status); if (status == GST_RTMP_CHUNK_PARSE_OK) return chunk; @@ -253,34 +276,56 @@ gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size) } GBytes * -gst_rtmp_chunk_serialize (GstRtmpChunk * chunk) +gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache) { guint8 *data; const guint8 *chunkdata; gsize chunksize; int header_fmt; + GstRtmpChunkCacheEntry *cached; + guint32 timestamp; + int offset; /* FIXME this is incomplete and inefficient */ chunkdata = g_bytes_get_data (chunk->payload, &chunksize); data = g_malloc (chunksize + 12); + header_fmt = 0; + cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); + if (cached) { + header_fmt = 1; + timestamp = chunk->timestamp - cached->timestamp; + } + g_assert (chunk->stream_id < 64); data[0] = (header_fmt << 6) | (chunk->stream_id); - g_assert (chunk->timestamp < 0xffffff); - data[1] = (chunk->timestamp >> 16) & 0xff; - data[2] = (chunk->timestamp >> 8) & 0xff; - data[3] = chunk->timestamp & 0xff; - data[4] = (chunk->message_length >> 16) & 0xff; - data[5] = (chunk->message_length >> 8) & 0xff; - data[6] = chunk->message_length & 0xff; - data[7] = chunk->message_type_id; - data[8] = 0; - data[9] = 0; - data[10] = 0; - data[11] = 0; - memcpy (data + 12, chunkdata, chunksize); - - return g_bytes_new_take (data, chunksize + 12); + if (header_fmt == 0) { + g_assert (chunk->timestamp < 0xffffff); + data[1] = (chunk->timestamp >> 16) & 0xff; + data[2] = (chunk->timestamp >> 8) & 0xff; + data[3] = chunk->timestamp & 0xff; + data[4] = (chunk->message_length >> 16) & 0xff; + data[5] = (chunk->message_length >> 8) & 0xff; + data[6] = chunk->message_length & 0xff; + data[7] = chunk->message_type_id; + data[8] = (chunk->info >> 24) & 0xff; + data[9] = (chunk->info >> 16) & 0xff; + data[10] = (chunk->info >> 8) & 0xff; + data[11] = chunk->info & 0xff; + offset = 12; + } else { + data[1] = (timestamp >> 16) & 0xff; + data[2] = (timestamp >> 8) & 0xff; + data[3] = timestamp & 0xff; + data[4] = (chunk->message_length >> 16) & 0xff; + data[5] = (chunk->message_length >> 8) & 0xff; + data[6] = chunk->message_length & 0xff; + data[7] = chunk->message_type_id; + offset = 8; + } + memcpy (data + offset, chunkdata, chunksize); + + return g_bytes_new_take (data, chunksize + offset); } void @@ -321,3 +366,47 @@ gst_rtmp_chunk_get_payload (GstRtmpChunk * chunk) { return chunk->payload; } + +/* chunk cache */ + +GstRtmpChunkCache * +gst_rtmp_chunk_cache_new (void) +{ + return g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkCacheEntry)); +} + +void +gst_rtmp_chunk_cache_free (GstRtmpChunkCache * cache) +{ + g_array_free (cache, TRUE); + +} + +GstRtmpChunkCacheEntry * +gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id) +{ + int i; + GstRtmpChunkCacheEntry *entry; + for (i = 0; i < cache->len; i++) { + entry = &g_array_index (cache, GstRtmpChunkCacheEntry, i); + if (entry->stream_id == stream_id) + return entry; + } + return NULL; +} + +void +gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk) +{ + GstRtmpChunkCacheEntry *entry; + entry = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); + if (entry == NULL) { + g_array_set_size (cache, cache->len + 1); + entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1); + entry->stream_id = chunk->stream_id; + } + entry->timestamp = chunk->timestamp; + entry->message_length = chunk->message_length; + entry->message_type_id = chunk->message_type_id; + entry->info = chunk->info; +} diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 834e217..267e871 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -32,6 +32,15 @@ G_BEGIN_DECLS typedef struct _GstRtmpChunk GstRtmpChunk; typedef struct _GstRtmpChunkClass GstRtmpChunkClass; +typedef GArray GstRtmpChunkCache; +typedef struct _GstRtmpChunkCacheEntry GstRtmpChunkCacheEntry; +struct _GstRtmpChunkCacheEntry { + guint32 stream_id; + guint32 timestamp; + gsize message_length; + int message_type_id; + guint32 info; +}; struct _GstRtmpChunk { @@ -63,9 +72,11 @@ GType gst_rtmp_chunk_get_type (void); GstRtmpChunk *gst_rtmp_chunk_new (void); GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes, - gsize *chunk_size); -GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size); -GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk); + gsize *chunk_size, GstRtmpChunkCache *cache); +GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size, + GstRtmpChunkCache *cache); +GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk, + GstRtmpChunkCache *cache); void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id); void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp); @@ -75,6 +86,15 @@ guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk); guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk); GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk); +/* chunk cache */ + +GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void); +void gst_rtmp_chunk_cache_free (GstRtmpChunkCache *cache); +GstRtmpChunkCacheEntry * gst_rtmp_chunk_cache_get ( + GstRtmpChunkCache *cache, int stream_id); +void gst_rtmp_chunk_cache_update (GstRtmpChunkCache *cache, + GstRtmpChunk *chunk); + G_END_DECLS #endif diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 625f721..6e37caa 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -104,6 +104,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) { rtmpconnection->cancellable = g_cancellable_new (); rtmpconnection->output_queue = g_queue_new (); + rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new (); + rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new (); } void @@ -156,6 +158,8 @@ gst_rtmp_connection_finalize (GObject * object) GST_DEBUG_OBJECT (rtmpconnection, "finalize"); /* clean up object here */ + gst_rtmp_chunk_cache_free (rtmpconnection->input_chunk_cache); + gst_rtmp_chunk_cache_free (rtmpconnection->output_chunk_cache); G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object); } @@ -177,7 +181,6 @@ gst_rtmp_connection_new (GSocketConnection * connection) (GSourceFunc) gst_rtmp_connection_input_ready, sc, NULL); g_source_attach (sc->input_source, NULL); - return sc; } @@ -209,15 +212,22 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) GST_DEBUG ("input ready"); - data = g_malloc (128); + data = g_malloc (4096); ret = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is), - data, 128, sc->cancellable, &error); + data, 4096, sc->cancellable, &error); if (ret < 0) { GST_ERROR ("read error: %s", error->message); g_error_free (error); return G_SOURCE_REMOVE; } + if (ret == 0) { + /* FIXME probably closed */ + GST_ERROR ("closed?"); + return G_SOURCE_REMOVE; + } + + GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret); if (sc->input_bytes) { sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret); @@ -225,15 +235,18 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) sc->input_bytes = g_bytes_new_take (data, ret); } - if (sc->input_callback) { - if (g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) { - GstRtmpConnectionCallback callback; - GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", - g_bytes_get_size (sc->input_bytes)); - callback = sc->input_callback; - sc->input_callback = NULL; - (*callback) (sc); - } + //GST_ERROR("ic: %p", sc->input_callback); + //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); + GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes); + + while (sc->input_callback && sc->input_bytes && + g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) { + GstRtmpConnectionCallback callback; + GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + g_bytes_get_size (sc->input_bytes)); + callback = sc->input_callback; + sc->input_callback = NULL; + (*callback) (sc); } return G_SOURCE_CONTINUE; @@ -246,10 +259,10 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) GstRtmpChunk *chunk; GBytes *bytes; - GST_ERROR ("output ready"); + GST_DEBUG ("output ready"); if (sc->writing) { - GST_ERROR ("busy writing"); + GST_DEBUG ("busy writing"); return G_SOURCE_REMOVE; } @@ -257,11 +270,13 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) if (!chunk) { return G_SOURCE_REMOVE; } + GST_ERROR ("popped"); sc->writing = TRUE; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - bytes = gst_rtmp_chunk_serialize (chunk); - //gst_rtmp_dump_data (bytes); + bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache); + gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk); + gst_rtmp_dump_data (bytes); g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); @@ -407,6 +422,12 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc) GST_ERROR ("server handshake finished"); sc->handshake_complete = TRUE; + if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { + GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + g_bytes_get_size (sc->input_bytes)); + gst_rtmp_connection_chunk_callback (sc); + } + gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); @@ -417,7 +438,7 @@ static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) { GstRtmpChunk *chunk; - gsize size; + gsize size = 0; while (1) { GBytes *bytes; @@ -425,17 +446,23 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) if (sc->input_bytes == NULL) break; - chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size); + chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size, + sc->input_chunk_cache); if (chunk == NULL) break; + gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk); + bytes = gst_rtmp_connection_take_input_bytes (sc, size); g_bytes_unref (bytes); GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length); g_signal_emit_by_name (sc, "got-chunk", chunk); g_object_unref (chunk); + + size = 0; } + GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size); gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, size); } @@ -447,6 +474,7 @@ gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, g_return_if_fail (GST_IS_RTMP_CONNECTION (connection)); g_return_if_fail (GST_IS_RTMP_CHUNK (chunk)); + GST_ERROR ("queued"); chunk->priv = connection; g_queue_push_tail (connection->output_queue, chunk); gst_rtmp_connection_start_output (connection); @@ -464,6 +492,18 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection, if (needed_bytes == 0) { connection->input_needed_bytes = 1; } + + if (connection->input_callback && connection->input_bytes && + g_bytes_get_size (connection->input_bytes) >= + connection->input_needed_bytes) { + GstRtmpConnectionCallback callback; + GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + g_bytes_get_size (connection->input_bytes)); + callback = connection->input_callback; + connection->input_callback = NULL; + (*callback) (connection); + } + } void @@ -581,7 +621,24 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, GST_ERROR ("client handshake finished"); sc->handshake_complete = TRUE; + if (sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= 1) { + GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, + g_bytes_get_size (sc->input_bytes)); + gst_rtmp_connection_chunk_callback (sc); + } + gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, 0); gst_rtmp_connection_start_output (sc); } + +void +gst_rtmp_connection_dump (GstRtmpConnection * connection) +{ + g_print (" output_queue: %d\n", + g_queue_get_length (connection->output_queue)); + g_print (" input_bytes: %" G_GSIZE_FORMAT "\n", + connection->input_bytes ? g_bytes_get_size (connection->input_bytes) : 0); + g_print (" needed: %" G_GSIZE_FORMAT "\n", connection->input_needed_bytes); + +} diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index 21a057e..6823d81 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -57,6 +57,8 @@ struct _GstRtmpConnection gsize input_needed_bytes; GstRtmpConnectionCallback input_callback; gboolean handshake_complete; + GstRtmpChunkCache *input_chunk_cache; + GstRtmpChunkCache *output_chunk_cache; }; struct _GstRtmpConnectionClass @@ -76,6 +78,7 @@ void gst_rtmp_connection_start_handshake (GstRtmpConnection *connection, gboolean is_server); void gst_rtmp_connection_queue_chunk (GstRtmpConnection *connection, GstRtmpChunk *chunk); +void gst_rtmp_connection_dump (GstRtmpConnection *connection); G_END_DECLS diff --git a/rtmp/rtmpserver.c b/rtmp/rtmpserver.c index 96f281d..7bb2854 100644 --- a/rtmp/rtmpserver.c +++ b/rtmp/rtmpserver.c @@ -74,7 +74,7 @@ gst_rtmp_server_class_init (GstRtmpServerClass * klass) static void gst_rtmp_server_init (GstRtmpServer * rtmpserver) { - rtmpserver->port = 11935; + rtmpserver->port = 1935; } void diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 11276dd..0a953fd 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -41,10 +41,12 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data); static void got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data); +static gboolean periodic (gpointer user_data); GstRtmpServer *server; GstRtmpClient *client; GstRtmpConnection *client_connection; +GstRtmpConnection *server_connection; GCancellable *cancellable; GstRtmpChunk *proxy_chunk; @@ -80,6 +82,7 @@ main (int argc, char *argv[]) client = gst_rtmp_client_new (); cancellable = g_cancellable_new (); + g_timeout_add (1000, periodic, NULL); main_loop = g_main_loop_new (NULL, TRUE); g_main_loop_run (main_loop); @@ -110,13 +113,14 @@ got_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk, GST_INFO ("got chunk"); bytes = gst_rtmp_chunk_get_payload (chunk); - gst_rtmp_dump_data (bytes); + if (0) + gst_rtmp_dump_data (bytes); proxy_conn = gst_rtmp_client_get_connection (client); g_object_ref (chunk); if (proxy_conn) { - GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, chunk->message_length); + GST_ERROR (">>>: %" G_GSIZE_FORMAT, chunk->message_length); gst_rtmp_connection_queue_chunk (proxy_conn, chunk); } else { GST_ERROR ("saving first chunk"); @@ -144,6 +148,7 @@ connect_done (GObject * source, GAsyncResult * result, gpointer user_data) GstRtmpConnection *proxy_conn; proxy_conn = gst_rtmp_client_get_connection (client); + server_connection = proxy_conn; GST_ERROR ("sending to server: %" G_GSIZE_FORMAT, proxy_chunk->message_length); gst_rtmp_connection_queue_chunk (proxy_conn, proxy_chunk); @@ -159,9 +164,30 @@ static void got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk, gpointer user_data) { + GBytes *bytes; + GST_INFO ("got chunk"); g_object_ref (chunk); - GST_ERROR ("sending to client: %" G_GSIZE_FORMAT, chunk->message_length); + GST_ERROR ("<<<: %" G_GSIZE_FORMAT, chunk->message_length); + + bytes = gst_rtmp_chunk_get_payload (chunk); + gst_rtmp_dump_data (bytes); + gst_rtmp_connection_queue_chunk (client_connection, chunk); } + +static gboolean +periodic (gpointer user_data) +{ + g_print (".\n"); + if (client_connection) { + g_print ("CLIENT:\n"); + gst_rtmp_connection_dump (client_connection); + } + if (server_connection) { + g_print ("SERVER:\n"); + gst_rtmp_connection_dump (server_connection); + } + return G_SOURCE_CONTINUE; +} |