diff options
author | David Schleef <ds@schleef.org> | 2014-08-26 21:24:01 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-26 21:24:30 -0700 |
commit | d84190d24394d3007ebad8f740cc32a16472028a (patch) | |
tree | 4c22e75d94f5eb4197403c138e74ff4bbc5d9eac | |
parent | d2e343f95296ac936cf25e2dc7c328e3b2bbe8de (diff) |
hacking
-rw-r--r-- | rtmp/rtmpchunk.c | 46 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 6 | ||||
-rw-r--r-- | rtmp/rtmpclient.c | 2 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 94 | ||||
-rw-r--r-- | rtmp/rtmpconnection.h | 3 | ||||
-rw-r--r-- | tools/proxy-server.c | 5 |
6 files changed, 120 insertions, 36 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index cc33183..f8bdd90 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -226,26 +226,30 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes, } GBytes * -gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache) +gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, + GstRtmpChunkHeader * previous_header, gsize max_chunk_size) { guint8 *data; const guint8 *chunkdata; gsize chunksize; int header_fmt; - GstRtmpChunkCacheEntry *cached; guint32 timestamp; int offset; + int i; /* FIXME this is incomplete and inefficient */ chunkdata = g_bytes_get_data (chunk->payload, &chunksize); - data = g_malloc (chunksize + 12); + g_assert (chunk->message_length == chunksize); + g_assert (chunk->stream_id < 64); + data = g_malloc (chunksize + 12 + (chunksize / max_chunk_size)); header_fmt = 0; - cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); - if (cached) { +#if 0 + if (previous_header->message_length > 0) { header_fmt = 1; - timestamp = chunk->timestamp - cached->previous_header.timestamp; + timestamp = chunk->timestamp - previous_header->timestamp; } +#endif g_assert (chunk->stream_id < 64); data[0] = (header_fmt << 6) | (chunk->stream_id); @@ -273,9 +277,25 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache) data[7] = chunk->message_type_id; offset = 8; } - memcpy (data + offset, chunkdata, chunksize); + if (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09 || + chunk->message_type_id == 0x14) { + for (i = 0; i < chunksize; i += max_chunk_size) { + if (i != 0) { + data[offset] = 0xc0 | chunk->stream_id; + offset++; + } + memcpy (data + offset, chunkdata + i, MIN (chunksize - i, + max_chunk_size)); + offset += MIN (chunksize - i, max_chunk_size); + } + } else { + memcpy (data + offset, chunkdata, chunksize); + offset += chunksize; + } + GST_ERROR ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id, + chunksize, offset); - return g_bytes_new_take (data, chunksize + offset); + return g_bytes_new_take (data, offset); } void @@ -349,15 +369,9 @@ gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id) } void -gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk) +gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry, + GstRtmpChunk * chunk) { - GstRtmpChunkCacheEntry *entry; - entry = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); - if (entry == NULL) { - g_array_set_size (cache, cache->len + 1); - entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1); - entry->previous_header.stream_id = chunk->stream_id; - } entry->previous_header.timestamp = chunk->timestamp; entry->previous_header.message_length = chunk->message_length; entry->previous_header.message_type_id = chunk->message_type_id; diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 4834e35..3660f02 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -87,7 +87,7 @@ GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes, GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size, GstRtmpChunkCache *cache); GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk, - GstRtmpChunkCache *cache); + GstRtmpChunkHeader *previous_header, gsize max_chunk_size); void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id); void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp); @@ -108,8 +108,8 @@ GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void); void gst_rtmp_chunk_cache_free (GstRtmpChunkCache *cache); GstRtmpChunkCacheEntry * gst_rtmp_chunk_cache_get ( GstRtmpChunkCache *cache, int stream_id); -void gst_rtmp_chunk_cache_update (GstRtmpChunkCache *cache, - GstRtmpChunk *chunk); +void gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry, + GstRtmpChunk * chunk); G_END_DECLS diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c index 2a53187..088b632 100644 --- a/rtmp/rtmpclient.c +++ b/rtmp/rtmpclient.c @@ -179,7 +179,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client, addr = g_network_address_new - ("ec2-54-188-128-44.us-west-2.compute.amazonaws.com", 1935); + ("ec2-54-190-75-249.us-west-2.compute.amazonaws.com", 1935); client->socket_client = g_socket_client_new (); GST_DEBUG ("g_socket_client_connect_async"); diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index e8dc6ae..d0b7d86 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -67,6 +67,9 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection, gsize needed_bytes); static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc); static void gst_rtmp_connection_start_output (GstRtmpConnection * sc); +static void +gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, + GstRtmpChunk * chunk); static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc); @@ -106,6 +109,9 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) rtmpconnection->output_queue = g_queue_new (); rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new (); rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new (); + + rtmpconnection->in_chunk_size = 128; + rtmpconnection->out_chunk_size = 128; } void @@ -227,7 +233,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) return G_SOURCE_REMOVE; } - GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret); + GST_DEBUG ("read %" G_GSIZE_FORMAT " bytes", ret); if (sc->input_bytes) { sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret); @@ -237,12 +243,12 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) //GST_ERROR("ic: %p", sc->input_callback); //GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); - GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes); + GST_DEBUG ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes); while (sc->input_callback && sc->input_bytes && g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) { GstRtmpConnectionCallback callback; - GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback", + GST_DEBUG ("got %" G_GSIZE_FORMAT " bytes, calling callback", g_bytes_get_size (sc->input_bytes)); callback = sc->input_callback; sc->input_callback = NULL; @@ -258,6 +264,7 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data); GstRtmpChunk *chunk; GBytes *bytes; + GstRtmpChunkCacheEntry *entry; GST_DEBUG ("output ready"); @@ -270,13 +277,15 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data) if (!chunk) { return G_SOURCE_REMOVE; } - GST_ERROR ("popped"); sc->writing = TRUE; os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection)); - bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache); - gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk); - gst_rtmp_dump_data (bytes); + + entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id); + bytes = gst_rtmp_chunk_serialize (chunk, &entry->previous_header, + sc->out_chunk_size); + gst_rtmp_chunk_cache_update (entry, chunk); + //gst_rtmp_dump_data (bytes); g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT, sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk); @@ -301,6 +310,7 @@ gst_rtmp_connection_write_chunk_done (GObject * obj, ret = g_output_stream_write_bytes_finish (os, res, &error); if (ret < 0) { GST_ERROR ("write error: %s", error->message); + g_assert_not_reached (); g_error_free (error); return; } @@ -452,6 +462,8 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) if (sc->input_bytes == NULL) break; + //gst_rtmp_dump_data (sc->input_bytes); + ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes); if (!ret) { needed_bytes = header.header_size; @@ -472,7 +484,12 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) } remaining_bytes = header.message_length - entry->offset; - chunk_bytes = MIN (remaining_bytes, 128); + if (header.message_type_id == 0x12 || header.message_type_id == 0x09 || + header.message_type_id == 0x14) { + chunk_bytes = MIN (remaining_bytes, sc->in_chunk_size); + } else { + chunk_bytes = remaining_bytes; + } data = g_bytes_get_data (sc->input_bytes, &size); if (header.header_size + chunk_bytes > size) { @@ -504,23 +521,72 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) header.message_length); entry->payload = NULL; - gst_rtmp_dump_data (entry->chunk->payload); + //gst_rtmp_dump_data (entry->chunk->payload); - GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", - entry->chunk->message_length); - g_signal_emit_by_name (sc, "got-chunk", entry->chunk); - g_object_unref (entry->chunk); + if (entry->chunk->stream_id == 0x02) { + GST_ERROR ("got protocol control message, type: %d", + entry->chunk->message_type_id); + gst_rtmp_connection_handle_pcm (sc, entry->chunk); + g_object_unref (entry->chunk); + } else { + GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", + entry->chunk->message_length); + g_signal_emit_by_name (sc, "got-chunk", entry->chunk); + g_object_unref (entry->chunk); + } entry->chunk = NULL; entry->offset = 0; } } - GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT ", have %" + GST_DEBUG ("setting needed bytes to %" G_GSIZE_FORMAT ", have %" G_GSIZE_FORMAT, needed_bytes, size); gst_rtmp_connection_set_input_callback (sc, gst_rtmp_connection_chunk_callback, needed_bytes); } +static void +gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection, + GstRtmpChunk * chunk) +{ + const guint8 *data; + gsize size; + guint32 moo; + guint32 moo2; + data = g_bytes_get_data (chunk->payload, &size); + switch (chunk->message_type_id) { + case 0x01: + moo = GST_READ_UINT32_BE (data); + GST_ERROR ("new chunk size %d", moo); + connection->in_chunk_size = moo; + break; + case 0x02: + moo = GST_READ_UINT32_BE (data); + GST_ERROR ("chunk abort, stream_id = %d", moo); + break; + case 0x03: + moo = GST_READ_UINT32_BE (data); + GST_ERROR ("acknowledgement %d", moo); + break; + case 0x04: + moo = GST_READ_UINT16_BE (data); + moo2 = GST_READ_UINT32_BE (data + 2); + GST_ERROR ("control: %d, %d", moo, moo2); + break; + case 0x05: + moo = GST_READ_UINT32_BE (data); + GST_ERROR ("window ack size: %d", moo); + break; + case 0x06: + moo = GST_READ_UINT32_BE (data); + GST_ERROR ("set peer bandwidth: %d", moo); + break; + default: + GST_ERROR ("unimplemented"); + break; + } +} + void gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection, GstRtmpChunk * chunk) diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h index 6823d81..deae843 100644 --- a/rtmp/rtmpconnection.h +++ b/rtmp/rtmpconnection.h @@ -59,6 +59,9 @@ struct _GstRtmpConnection gboolean handshake_complete; GstRtmpChunkCache *input_chunk_cache; GstRtmpChunkCache *output_chunk_cache; + + gsize in_chunk_size; + gsize out_chunk_size; }; struct _GstRtmpConnectionClass diff --git a/tools/proxy-server.c b/tools/proxy-server.c index 0a953fd..e9e1d52 100644 --- a/tools/proxy-server.c +++ b/tools/proxy-server.c @@ -94,7 +94,7 @@ static void add_connection (GstRtmpServer * server, GstRtmpConnection * connection, gpointer user_data) { - GST_INFO ("new connection"); + GST_ERROR ("new connection"); g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL); @@ -172,7 +172,8 @@ got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk, GST_ERROR ("<<<: %" G_GSIZE_FORMAT, chunk->message_length); bytes = gst_rtmp_chunk_get_payload (chunk); - gst_rtmp_dump_data (bytes); + if (0) + gst_rtmp_dump_data (bytes); gst_rtmp_connection_queue_chunk (client_connection, chunk); } |