summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Schleef <ds@schleef.org>2014-08-25 12:25:05 -0700
committerDavid Schleef <ds@schleef.org>2014-08-25 12:25:05 -0700
commitd2e343f95296ac936cf25e2dc7c328e3b2bbe8de (patch)
treeeef0cac5f6c2337c60a6aa584024e23dcd9f88ee
parent2525802c534bb87145088a5c69a6df2ff3f57542 (diff)
rewrote chunk parsing
-rw-r--r--rtmp/rtmpchunk.c185
-rw-r--r--rtmp/rtmpchunk.h18
-rw-r--r--rtmp/rtmpconnection.c84
3 files changed, 155 insertions, 132 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c
index 8f27e37..cc33183 100644
--- a/rtmp/rtmpchunk.c
+++ b/rtmp/rtmpchunk.c
@@ -128,89 +128,91 @@ gst_rtmp_chunk_new (void)
return g_object_new (GST_TYPE_RTMP_CHUNK, NULL);
}
-static GstRtmpChunkParseStatus
-chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes,
- GstRtmpChunkCache * cache)
+gboolean
+gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader * header, GBytes * bytes)
{
- int offset;
const guint8 *data;
+ const gsize sizes[4] = { 12, 8, 4, 1 };
+ int stream_id;
gsize size;
- int header_fmt;
- if (*needed_bytes)
- *needed_bytes = 0;
+ data = g_bytes_get_data (bytes, &size);
+ header->format = data[0] >> 6;
+ header->header_size = sizes[header->format];
+
+ stream_id = data[0] & 0x3f;
+ if (stream_id == 0) {
+ if (size >= 2)
+ header->stream_id = 64 + data[1];
+ header->header_size += 1;
+ } else if (stream_id == 1) {
+ if (size >= 3)
+ header->stream_id = 64 + data[1] + (data[2] << 8);
+ header->header_size += 2;
+ } else {
+ header->stream_id = stream_id;
+ }
+
+ return (header->header_size <= size);
+}
+
+gboolean
+gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes,
+ GstRtmpChunkHeader * previous_header)
+{
+ int offset;
+ const guint8 *data;
+ gsize size;
data = g_bytes_get_data (bytes, &size);
- if (size < 1)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- header_fmt = data[0] >> 6; /* librtmp: m_headerType */
- chunk->stream_id = data[0] & 0x3f; /* librtmp: m_nChannel */
+ header->format = data[0] >> 6;
+ header->stream_id = data[0] & 0x3f;
offset = 1;
- if (chunk->stream_id == 0) {
- if (size < 2)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->stream_id = 64 + data[1];
+ if (header->stream_id == 0) {
+ header->stream_id = 64 + data[1];
offset = 2;
- } else if (chunk->stream_id == 1) {
- if (size < 3)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->stream_id = 64 + data[1] + (data[2] << 8);
+ } else if (header->stream_id == 1) {
+ header->stream_id = 64 + data[1] + (data[2] << 8);
offset = 3;
}
- if (header_fmt == 0) {
- if (size < offset + 11)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- /* librtmp: m_nTimeStamp */
- chunk->timestamp =
+ if (header->format == 0) {
+ header->timestamp =
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
- /* librtmp: m_nBodySize */
- chunk->message_length =
+ header->message_length =
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
- if (chunk->timestamp == 0xffffff) {
- if (size < offset + 4 + 1 + 4)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) |
+ if (header->timestamp == 0xffffff) {
+ GST_ERROR ("unimplemented");
+ g_assert_not_reached ();
+ header->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) |
(data[offset + 2] << 8) | data[offset + 3];
offset += 4;
}
- chunk->message_type_id = data[offset]; /* librtmp: m_packetType */
+ header->message_type_id = data[offset];
offset += 1;
- /* librtmp: m_nInfoField2 */
- chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) |
+ header->info = (data[offset] << 24) | (data[offset + 1] << 16) |
(data[offset + 2] << 8) | data[offset + 3];
offset += 4;
} else {
- GstRtmpChunkCacheEntry *cached;
-
- cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
- if (cached) {
- chunk->timestamp = cached->timestamp;
- chunk->message_length = cached->message_length;
- chunk->message_type_id = cached->message_type_id;
- chunk->info = cached->info;
- } else {
- GST_ERROR ("uncached chunk, stream_id = %d", chunk->stream_id);
- }
+ header->timestamp = previous_header->timestamp;
+ header->message_length = previous_header->message_length;
+ header->message_type_id = previous_header->message_type_id;
+ header->info = previous_header->info;
- if (header_fmt == 1) {
- if (size < offset + 7)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->timestamp +=
+ if (header->format == 1) {
+ header->timestamp +=
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
- chunk->message_length =
+ header->message_length =
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
- chunk->message_type_id = data[offset];
+ header->message_type_id = data[offset];
offset += 1;
- } else if (header_fmt == 2) {
- if (size < offset + 3)
- return GST_RTMP_CHUNK_PARSE_UNKNOWN;
- chunk->timestamp +=
+ } else if (header->format == 2) {
+ header->timestamp +=
(data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2];
offset += 3;
} else {
@@ -218,61 +220,9 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes,
}
}
- /* WTF */
- if ((header_fmt < 2) &&
- (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09)) {
- chunk->message_length += (chunk->message_length >> 7);
- }
-#if 1
- if (1) {
- GBytes *b;
- GST_ERROR ("PARSED CHUNK:");
- b = g_bytes_new_from_bytes (bytes, 0, size);
- gst_rtmp_dump_data (b);
- g_bytes_unref (b);
- //*(int *)0 = 1;
- }
-#endif
-
- if (needed_bytes)
- *needed_bytes = offset + chunk->message_length;
- if (size < offset + chunk->message_length)
- return GST_RTMP_CHUNK_PARSE_NEED_BYTES;
-
- chunk->payload =
- g_bytes_new_from_bytes (bytes, offset, chunk->message_length);
- return GST_RTMP_CHUNK_PARSE_OK;
-}
-
-GstRtmpChunkParseStatus
-gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size,
- GstRtmpChunkCache * cache)
-{
- GstRtmpChunk *chunk;
- GstRtmpChunkParseStatus status;
-
- chunk = gst_rtmp_chunk_new ();
- status = chunk_parse (chunk, bytes, chunk_size, cache);
- g_object_unref (chunk);
-
- return status;
-}
-
-GstRtmpChunk *
-gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size,
- GstRtmpChunkCache * cache)
-{
- GstRtmpChunk *chunk;
- GstRtmpChunkParseStatus status;
-
- chunk = gst_rtmp_chunk_new ();
- status = chunk_parse (chunk, bytes, chunk_size, cache);
- GST_ERROR ("status %d", status);
- if (status == GST_RTMP_CHUNK_PARSE_OK)
- return chunk;
+ header->header_size = offset;
- g_object_unref (chunk);
- return NULL;
+ return (header->header_size <= size);
}
GBytes *
@@ -294,7 +244,7 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache)
cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id);
if (cached) {
header_fmt = 1;
- timestamp = chunk->timestamp - cached->timestamp;
+ timestamp = chunk->timestamp - cached->previous_header.timestamp;
}
g_assert (chunk->stream_id < 64);
@@ -389,10 +339,13 @@ gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id)
GstRtmpChunkCacheEntry *entry;
for (i = 0; i < cache->len; i++) {
entry = &g_array_index (cache, GstRtmpChunkCacheEntry, i);
- if (entry->stream_id == stream_id)
+ if (entry->previous_header.stream_id == stream_id)
return entry;
}
- return NULL;
+ g_array_set_size (cache, cache->len + 1);
+ entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1);
+ entry->previous_header.stream_id = stream_id;
+ return entry;
}
void
@@ -403,10 +356,10 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk)
if (entry == NULL) {
g_array_set_size (cache, cache->len + 1);
entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1);
- entry->stream_id = chunk->stream_id;
+ entry->previous_header.stream_id = chunk->stream_id;
}
- entry->timestamp = chunk->timestamp;
- entry->message_length = chunk->message_length;
- entry->message_type_id = chunk->message_type_id;
- entry->info = chunk->info;
+ entry->previous_header.timestamp = chunk->timestamp;
+ entry->previous_header.message_length = chunk->message_length;
+ entry->previous_header.message_type_id = chunk->message_type_id;
+ entry->previous_header.info = chunk->info;
}
diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h
index 267e871..4834e35 100644
--- a/rtmp/rtmpchunk.h
+++ b/rtmp/rtmpchunk.h
@@ -34,7 +34,11 @@ typedef struct _GstRtmpChunk GstRtmpChunk;
typedef struct _GstRtmpChunkClass GstRtmpChunkClass;
typedef GArray GstRtmpChunkCache;
typedef struct _GstRtmpChunkCacheEntry GstRtmpChunkCacheEntry;
-struct _GstRtmpChunkCacheEntry {
+typedef struct _GstRtmpChunkHeader GstRtmpChunkHeader;
+
+struct _GstRtmpChunkHeader {
+ int format;
+ gsize header_size;
guint32 stream_id;
guint32 timestamp;
gsize message_length;
@@ -42,6 +46,13 @@ struct _GstRtmpChunkCacheEntry {
guint32 info;
};
+struct _GstRtmpChunkCacheEntry {
+ GstRtmpChunkHeader previous_header;
+ GstRtmpChunk *chunk;
+ guint8 *payload;
+ gsize offset;
+};
+
struct _GstRtmpChunk
{
GObject object;
@@ -86,6 +97,11 @@ guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk);
guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk);
GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk);
+gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes);
+gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes,
+ GstRtmpChunkHeader *previous_header);
+
+
/* chunk cache */
GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void);
diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c
index 6e37caa..e8dc6ae 100644
--- a/rtmp/rtmpconnection.c
+++ b/rtmp/rtmpconnection.c
@@ -437,34 +437,88 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc)
static void
gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc)
{
- GstRtmpChunk *chunk;
+ gsize needed_bytes = 0;
gsize size = 0;
while (1) {
+ GstRtmpChunkHeader header = { 0 };
+ GstRtmpChunkCacheEntry *entry;
GBytes *bytes;
+ gboolean ret;
+ gsize remaining_bytes;
+ gsize chunk_bytes;
+ const guint8 *data;
if (sc->input_bytes == NULL)
break;
- chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size,
- sc->input_chunk_cache);
- if (chunk == NULL)
+ ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes);
+ if (!ret) {
+ needed_bytes = header.header_size;
+ break;
+ }
+
+ entry = gst_rtmp_chunk_cache_get (sc->input_chunk_cache, header.stream_id);
+
+ if (entry->chunk && header.format != 3) {
+ GST_ERROR ("expected message continuation, got new message");
+ }
+
+ ret = gst_rtmp_chunk_parse_header2 (&header, sc->input_bytes,
+ &entry->previous_header);
+ if (!ret) {
+ needed_bytes = header.header_size;
break;
+ }
- gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk);
+ remaining_bytes = header.message_length - entry->offset;
+ chunk_bytes = MIN (remaining_bytes, 128);
+ data = g_bytes_get_data (sc->input_bytes, &size);
- bytes = gst_rtmp_connection_take_input_bytes (sc, size);
+ if (header.header_size + chunk_bytes > size) {
+ needed_bytes = header.header_size + chunk_bytes;
+ break;
+ }
+
+ if (entry->chunk == NULL) {
+ entry->chunk = gst_rtmp_chunk_new ();
+ entry->chunk->stream_id = header.stream_id;
+ entry->chunk->timestamp = header.timestamp;
+ entry->chunk->message_length = header.message_length;
+ entry->chunk->message_type_id = header.message_type_id;
+ entry->chunk->info = header.info;
+ entry->payload = g_malloc (header.message_length);
+ }
+ memcpy (&entry->previous_header, &header, sizeof (header));
+
+ memcpy (entry->payload + entry->offset, data + header.header_size,
+ chunk_bytes);
+ entry->offset += chunk_bytes;
+
+ bytes = gst_rtmp_connection_take_input_bytes (sc,
+ header.header_size + chunk_bytes);
g_bytes_unref (bytes);
- GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length);
- g_signal_emit_by_name (sc, "got-chunk", chunk);
- g_object_unref (chunk);
+ if (entry->offset == header.message_length) {
+ entry->chunk->payload = g_bytes_new_take (entry->payload,
+ header.message_length);
+ entry->payload = NULL;
- size = 0;
+ gst_rtmp_dump_data (entry->chunk->payload);
+
+ GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes",
+ entry->chunk->message_length);
+ g_signal_emit_by_name (sc, "got-chunk", entry->chunk);
+ g_object_unref (entry->chunk);
+
+ entry->chunk = NULL;
+ entry->offset = 0;
+ }
}
- GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size);
+ GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT ", have %"
+ G_GSIZE_FORMAT, needed_bytes, size);
gst_rtmp_connection_set_input_callback (sc,
- gst_rtmp_connection_chunk_callback, size);
+ gst_rtmp_connection_chunk_callback, needed_bytes);
}
void
@@ -625,10 +679,10 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj,
GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT,
g_bytes_get_size (sc->input_bytes));
gst_rtmp_connection_chunk_callback (sc);
+ } else {
+ gst_rtmp_connection_set_input_callback (sc,
+ gst_rtmp_connection_chunk_callback, 0);
}
-
- gst_rtmp_connection_set_input_callback (sc,
- gst_rtmp_connection_chunk_callback, 0);
gst_rtmp_connection_start_output (sc);
}