summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-26 21:24:01 -0700
committerDavid Schleef <ds@schleef.org>2014-08-26 21:24:30 -0700
commitd84190d24394d3007ebad8f740cc32a16472028a (patch)
tree4c22e75d94f5eb4197403c138e74ff4bbc5d9eac
parentd2e343f95296ac936cf25e2dc7c328e3b2bbe8de (diff)
hacking
-rw-r--r--rtmp/rtmpchunk.c46
-rw-r--r--rtmp/rtmpchunk.h6
-rw-r--r--rtmp/rtmpclient.c2
-rw-r--r--rtmp/rtmpconnection.c94
-rw-r--r--rtmp/rtmpconnection.h3
-rw-r--r--tools/proxy-server.c5
6 files changed, 120 insertions, 36 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index cc33183..f8bdd90 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -226,26 +226,30 @@ gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes,
}
GBytes *
-gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache)
+gst_rtmp_chunk_serialize (GstRtmpChunk * chunk,
+ GstRtmpChunkHeader * previous_header, gsize max_chunk_size)
{
guint8 *data;
const guint8 *chunkdata;
gsize chunksize;
int header_fmt;
- GstRtmpChunkCacheEntry *cached;
guint32 timestamp;
int offset;
+ int i;
/* FIXME this is incomplete and inefficient */
chunkdata = g_bytes_get_data (chunk->payload, &chunksize);
- data = g_malloc (chunksize + 12);
+ g_assert (chunk->message_length == chunksize);
+ g_assert (chunk->stream_id < 64);
+ data = g_malloc (chunksize + 12 + (chunksize / max_chunk_size));
header_fmt = 0;
- cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
- if (cached) {
+#if 0
+ if (previous_header->message_length > 0) {
header_fmt = 1;
- timestamp = chunk->timestamp - cached->previous_header.timestamp;
+ timestamp = chunk->timestamp - previous_header->timestamp;
}
+#endif
g_assert (chunk->stream_id < 64);
data[0] = (header_fmt << 6) | (chunk->stream_id);
@@ -273,9 +277,25 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache)
data[7] = chunk->message_type_id;
offset = 8;
}
- memcpy (data + offset, chunkdata, chunksize);
+ if (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09 ||
+ chunk->message_type_id == 0x14) {
+ for (i = 0; i < chunksize; i += max_chunk_size) {
+ if (i != 0) {
+ data[offset] = 0xc0 | chunk->stream_id;
+ offset++;
+ }
+ memcpy (data + offset, chunkdata + i, MIN (chunksize - i,
+ max_chunk_size));
+ offset += MIN (chunksize - i, max_chunk_size);
+ }
+ } else {
+ memcpy (data + offset, chunkdata, chunksize);
+ offset += chunksize;
+ }
+ GST_ERROR ("type: %d in: %" G_GSIZE_FORMAT " out: %d", chunk->message_type_id,
+ chunksize, offset);
- return g_bytes_new_take (data, chunksize + offset);
+ return g_bytes_new_take (data, offset);
}
void
@@ -349,15 +369,9 @@ gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id)
}
void
-gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk)
+gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry,
+ GstRtmpChunk * chunk)
{
- GstRtmpChunkCacheEntry *entry;
- entry = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
- if (entry == NULL) {
- g_array_set_size (cache, cache->len + 1);
- entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1);
- entry->previous_header.stream_id = chunk->stream_id;
- }
entry->previous_header.timestamp = chunk->timestamp;
entry->previous_header.message_length = chunk->message_length;
entry->previous_header.message_type_id = chunk->message_type_id;
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 4834e35..3660f02 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -87,7 +87,7 @@ GstRtmpChunkParseStatus gst_rtmp_chunk_can_parse (GBytes *bytes,
GstRtmpChunk * gst_rtmp_chunk_new_parse (GBytes *bytes, gsize *chunk_size,
GstRtmpChunkCache *cache);
GBytes * gst_rtmp_chunk_serialize (GstRtmpChunk *chunk,
- GstRtmpChunkCache *cache);
+ GstRtmpChunkHeader *previous_header, gsize max_chunk_size);
void gst_rtmp_chunk_set_stream_id (GstRtmpChunk *chunk, guint32 stream_id);
void gst_rtmp_chunk_set_timestamp (GstRtmpChunk *chunk, guint32 timestamp);
@@ -108,8 +108,8 @@ GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void);
void gst_rtmp_chunk_cache_free (GstRtmpChunkCache *cache);
GstRtmpChunkCacheEntry * gst_rtmp_chunk_cache_get (
GstRtmpChunkCache *cache, int stream_id);
-void gst_rtmp_chunk_cache_update (GstRtmpChunkCache *cache,
- GstRtmpChunk *chunk);
+void gst_rtmp_chunk_cache_update (GstRtmpChunkCacheEntry * entry,
+ GstRtmpChunk * chunk);
G_END_DECLS
diff --git a/rtmp/rtmpclient.c b/rtmp/rtmpclient.c
index 2a53187..088b632 100644
--- a/rtmp/rtmpclient.c
+++ b/rtmp/rtmpclient.c
@@ -179,7 +179,7 @@ gst_rtmp_client_connect_async (GstRtmpClient * client,
addr =
g_network_address_new
- ("ec2-54-188-128-44.us-west-2.compute.amazonaws.com", 1935);
+ ("ec2-54-190-75-249.us-west-2.compute.amazonaws.com", 1935);
client->socket_client = g_socket_client_new ();
GST_DEBUG ("g_socket_client_connect_async");
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index e8dc6ae..d0b7d86 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -67,6 +67,9 @@ gst_rtmp_connection_set_input_callback (GstRtmpConnection * connection,
gsize needed_bytes);
static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc);
static void gst_rtmp_connection_start_output (GstRtmpConnection * sc);
+static void
+gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk);
static void gst_rtmp_connection_server_handshake1 (GstRtmpConnection * sc);
@@ -106,6 +109,9 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
rtmpconnection->output_queue = g_queue_new ();
rtmpconnection->input_chunk_cache = gst_rtmp_chunk_cache_new ();
rtmpconnection->output_chunk_cache = gst_rtmp_chunk_cache_new ();
+
+ rtmpconnection->in_chunk_size = 128;
+ rtmpconnection->out_chunk_size = 128;
}
void
@@ -227,7 +233,7 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
return G_SOURCE_REMOVE;
}
- GST_ERROR ("read %" G_GSIZE_FORMAT " bytes", ret);
+ GST_DEBUG ("read %" G_GSIZE_FORMAT " bytes", ret);
if (sc->input_bytes) {
sc->input_bytes = gst_rtmp_bytes_append (sc->input_bytes, data, ret);
@@ -237,12 +243,12 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
//GST_ERROR("ic: %p", sc->input_callback);
//GST_ERROR("in queue: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes));
- GST_ERROR ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes);
+ GST_DEBUG ("needed: %" G_GSIZE_FORMAT, sc->input_needed_bytes);
while (sc->input_callback && sc->input_bytes &&
g_bytes_get_size (sc->input_bytes) >= sc->input_needed_bytes) {
GstRtmpConnectionCallback callback;
- GST_ERROR ("got %" G_GSIZE_FORMAT " bytes, calling callback",
+ GST_DEBUG ("got %" G_GSIZE_FORMAT " bytes, calling callback",
g_bytes_get_size (sc->input_bytes));
callback = sc->input_callback;
sc->input_callback = NULL;
@@ -258,6 +264,7 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
GstRtmpConnection *sc = GST_RTMP_CONNECTION (user_data);
GstRtmpChunk *chunk;
GBytes *bytes;
+ GstRtmpChunkCacheEntry *entry;
GST_DEBUG ("output ready");
@@ -270,13 +277,15 @@ gst_rtmp_connection_output_ready (GOutputStream * os, gpointer user_data)
if (!chunk) {
return G_SOURCE_REMOVE;
}
- GST_ERROR ("popped");
sc->writing = TRUE;
os = g_io_stream_get_output_stream (G_IO_STREAM (sc->connection));
- bytes = gst_rtmp_chunk_serialize (chunk, sc->output_chunk_cache);
- gst_rtmp_chunk_cache_update (sc->output_chunk_cache, chunk);
- gst_rtmp_dump_data (bytes);
+
+ entry = gst_rtmp_chunk_cache_get (sc->output_chunk_cache, chunk->stream_id);
+ bytes = gst_rtmp_chunk_serialize (chunk, &entry->previous_header,
+ sc->out_chunk_size);
+ gst_rtmp_chunk_cache_update (entry, chunk);
+ //gst_rtmp_dump_data (bytes);
g_output_stream_write_bytes_async (os, bytes, G_PRIORITY_DEFAULT,
sc->cancellable, gst_rtmp_connection_write_chunk_done, chunk);
@@ -301,6 +310,7 @@ gst_rtmp_connection_write_chunk_done (GObject * obj,
ret = g_output_stream_write_bytes_finish (os, res, &error);
if (ret < 0) {
GST_ERROR ("write error: %s", error->message);
+ g_assert_not_reached ();
g_error_free (error);
return;
}
@@ -452,6 +462,8 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
if (sc->input_bytes == NULL)
break;
+ //gst_rtmp_dump_data (sc->input_bytes);
+
ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes);
if (!ret) {
needed_bytes = header.header_size;
@@ -472,7 +484,12 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
}
remaining_bytes = header.message_length - entry->offset;
- chunk_bytes = MIN (remaining_bytes, 128);
+ if (header.message_type_id == 0x12 || header.message_type_id == 0x09 ||
+ header.message_type_id == 0x14) {
+ chunk_bytes = MIN (remaining_bytes, sc->in_chunk_size);
+ } else {
+ chunk_bytes = remaining_bytes;
+ }
data = g_bytes_get_data (sc->input_bytes, &size);
if (header.header_size + chunk_bytes > size) {
@@ -504,23 +521,72 @@ gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
header.message_length);
entry->payload = NULL;
- gst_rtmp_dump_data (entry->chunk->payload);
+ //gst_rtmp_dump_data (entry->chunk->payload);
- GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes",
- entry->chunk->message_length);
- g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
- g_object_unref (entry->chunk);
+ if (entry->chunk->stream_id == 0x02) {
+ GST_ERROR ("got protocol control message, type: %d",
+ entry->chunk->message_type_id);
+ gst_rtmp_connection_handle_pcm (sc, entry->chunk);
+ g_object_unref (entry->chunk);
+ } else {
+ GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes",
+ entry->chunk->message_length);
+ g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
+ g_object_unref (entry->chunk);
+ }
entry->chunk = NULL;
entry->offset = 0;
}
}
- GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT ", have %"
+ GST_DEBUG ("setting needed bytes to %" G_GSIZE_FORMAT ", have %"
G_GSIZE_FORMAT, needed_bytes, size);
gst_rtmp_connection_set_input_callback (sc,
gst_rtmp_connection_chunk_callback, needed_bytes);
}
+static void
+gst_rtmp_connection_handle_pcm (GstRtmpConnection * connection,
+ GstRtmpChunk * chunk)
+{
+ const guint8 *data;
+ gsize size;
+ guint32 moo;
+ guint32 moo2;
+ data = g_bytes_get_data (chunk->payload, &size);
+ switch (chunk->message_type_id) {
+ case 0x01:
+ moo = GST_READ_UINT32_BE (data);
+ GST_ERROR ("new chunk size %d", moo);
+ connection->in_chunk_size = moo;
+ break;
+ case 0x02:
+ moo = GST_READ_UINT32_BE (data);
+ GST_ERROR ("chunk abort, stream_id = %d", moo);
+ break;
+ case 0x03:
+ moo = GST_READ_UINT32_BE (data);
+ GST_ERROR ("acknowledgement %d", moo);
+ break;
+ case 0x04:
+ moo = GST_READ_UINT16_BE (data);
+ moo2 = GST_READ_UINT32_BE (data + 2);
+ GST_ERROR ("control: %d, %d", moo, moo2);
+ break;
+ case 0x05:
+ moo = GST_READ_UINT32_BE (data);
+ GST_ERROR ("window ack size: %d", moo);
+ break;
+ case 0x06:
+ moo = GST_READ_UINT32_BE (data);
+ GST_ERROR ("set peer bandwidth: %d", moo);
+ break;
+ default:
+ GST_ERROR ("unimplemented");
+ break;
+ }
+}
+
void
gst_rtmp_connection_queue_chunk (GstRtmpConnection * connection,
GstRtmpChunk * chunk)
diff --git a/rtmp/rtmpconnection.h b/rtmp/rtmpconnection.h
index 6823d81..deae843 100644
--- a/rtmp/rtmpconnection.h
+++ b/rtmp/rtmpconnection.h
@@ -59,6 +59,9 @@ struct _GstRtmpConnection
gboolean handshake_complete;
GstRtmpChunkCache *input_chunk_cache;
GstRtmpChunkCache *output_chunk_cache;
+
+ gsize in_chunk_size;
+ gsize out_chunk_size;
};
struct _GstRtmpConnectionClass
diff --git a/tools/proxy-server.c b/tools/proxy-server.c
index 0a953fd..e9e1d52 100644
--- a/tools/proxy-server.c
+++ b/tools/proxy-server.c
@@ -94,7 +94,7 @@ static void
add_connection (GstRtmpServer * server, GstRtmpConnection * connection,
gpointer user_data)
{
- GST_INFO ("new connection");
+ GST_ERROR ("new connection");
g_signal_connect (connection, "got-chunk", G_CALLBACK (got_chunk), NULL);
@@ -172,7 +172,8 @@ got_chunk_proxy (GstRtmpConnection * connection, GstRtmpChunk * chunk,
GST_ERROR ("<<<: %" G_GSIZE_FORMAT, chunk->message_length);
bytes = gst_rtmp_chunk_get_payload (chunk);
- gst_rtmp_dump_data (bytes);
+ if (0)
+ gst_rtmp_dump_data (bytes);
gst_rtmp_connection_queue_chunk (client_connection, chunk);
}