diff options
author | David Schleef <ds@schleef.org> | 2014-08-25 12:25:05 -0700 |
---|---|---|
committer | David Schleef <ds@schleef.org> | 2014-08-25 12:25:05 -0700 |
commit | d2e343f95296ac936cf25e2dc7c328e3b2bbe8de (patch) | |
tree | eef0cac5f6c2337c60a6aa584024e23dcd9f88ee | |
parent | 2525802c534bb87145088a5c69a6df2ff3f57542 (diff) |
rewrote chunk parsing
-rw-r--r-- | rtmp/rtmpchunk.c | 185 | ||||
-rw-r--r-- | rtmp/rtmpchunk.h | 18 | ||||
-rw-r--r-- | rtmp/rtmpconnection.c | 84 |
3 files changed, 155 insertions, 132 deletions
diff --git a/rtmp/rtmpchunk.c b/rtmp/rtmpchunk.c index 8f27e37..cc33183 100644 --- a/rtmp/rtmpchunk.c +++ b/rtmp/rtmpchunk.c @@ -128,89 +128,91 @@ gst_rtmp_chunk_new (void) return g_object_new (GST_TYPE_RTMP_CHUNK, NULL); } -static GstRtmpChunkParseStatus -chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes, - GstRtmpChunkCache * cache) +gboolean +gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader * header, GBytes * bytes) { - int offset; const guint8 *data; + const gsize sizes[4] = { 12, 8, 4, 1 }; + int stream_id; gsize size; - int header_fmt; - if (*needed_bytes) - *needed_bytes = 0; + data = g_bytes_get_data (bytes, &size); + header->format = data[0] >> 6; + header->header_size = sizes[header->format]; + + stream_id = data[0] & 0x3f; + if (stream_id == 0) { + if (size >= 2) + header->stream_id = 64 + data[1]; + header->header_size += 1; + } else if (stream_id == 1) { + if (size >= 3) + header->stream_id = 64 + data[1] + (data[2] << 8); + header->header_size += 2; + } else { + header->stream_id = stream_id; + } + + return (header->header_size <= size); +} + +gboolean +gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader * header, GBytes * bytes, + GstRtmpChunkHeader * previous_header) +{ + int offset; + const guint8 *data; + gsize size; data = g_bytes_get_data (bytes, &size); - if (size < 1) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - header_fmt = data[0] >> 6; /* librtmp: m_headerType */ - chunk->stream_id = data[0] & 0x3f; /* librtmp: m_nChannel */ + header->format = data[0] >> 6; + header->stream_id = data[0] & 0x3f; offset = 1; - if (chunk->stream_id == 0) { - if (size < 2) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->stream_id = 64 + data[1]; + if (header->stream_id == 0) { + header->stream_id = 64 + data[1]; offset = 2; - } else if (chunk->stream_id == 1) { - if (size < 3) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->stream_id = 64 + data[1] + (data[2] << 8); + } else if (header->stream_id == 1) { + header->stream_id = 64 + data[1] + (data[2] << 8); offset = 3; } - if (header_fmt == 0) { - if (size < offset + 11) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - /* librtmp: m_nTimeStamp */ - chunk->timestamp = + if (header->format == 0) { + header->timestamp = (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; - /* librtmp: m_nBodySize */ - chunk->message_length = + header->message_length = (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; - if (chunk->timestamp == 0xffffff) { - if (size < offset + 4 + 1 + 4) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) | + if (header->timestamp == 0xffffff) { + GST_ERROR ("unimplemented"); + g_assert_not_reached (); + header->timestamp = (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; offset += 4; } - chunk->message_type_id = data[offset]; /* librtmp: m_packetType */ + header->message_type_id = data[offset]; offset += 1; - /* librtmp: m_nInfoField2 */ - chunk->info = (data[offset] << 24) | (data[offset + 1] << 16) | + header->info = (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; offset += 4; } else { - GstRtmpChunkCacheEntry *cached; - - cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); - if (cached) { - chunk->timestamp = cached->timestamp; - chunk->message_length = cached->message_length; - chunk->message_type_id = cached->message_type_id; - chunk->info = cached->info; - } else { - GST_ERROR ("uncached chunk, stream_id = %d", chunk->stream_id); - } + header->timestamp = previous_header->timestamp; + header->message_length = previous_header->message_length; + header->message_type_id = previous_header->message_type_id; + header->info = previous_header->info; - if (header_fmt == 1) { - if (size < offset + 7) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->timestamp += + if (header->format == 1) { + header->timestamp += (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; - chunk->message_length = + header->message_length = (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; - chunk->message_type_id = data[offset]; + header->message_type_id = data[offset]; offset += 1; - } else if (header_fmt == 2) { - if (size < offset + 3) - return GST_RTMP_CHUNK_PARSE_UNKNOWN; - chunk->timestamp += + } else if (header->format == 2) { + header->timestamp += (data[offset] << 16) | (data[offset + 1] << 8) | data[offset + 2]; offset += 3; } else { @@ -218,61 +220,9 @@ chunk_parse (GstRtmpChunk * chunk, GBytes * bytes, gsize * needed_bytes, } } - /* WTF */ - if ((header_fmt < 2) && - (chunk->message_type_id == 0x12 || chunk->message_type_id == 0x09)) { - chunk->message_length += (chunk->message_length >> 7); - } -#if 1 - if (1) { - GBytes *b; - GST_ERROR ("PARSED CHUNK:"); - b = g_bytes_new_from_bytes (bytes, 0, size); - gst_rtmp_dump_data (b); - g_bytes_unref (b); - //*(int *)0 = 1; - } -#endif - - if (needed_bytes) - *needed_bytes = offset + chunk->message_length; - if (size < offset + chunk->message_length) - return GST_RTMP_CHUNK_PARSE_NEED_BYTES; - - chunk->payload = - g_bytes_new_from_bytes (bytes, offset, chunk->message_length); - return GST_RTMP_CHUNK_PARSE_OK; -} - -GstRtmpChunkParseStatus -gst_rtmp_chunk_can_parse (GBytes * bytes, gsize * chunk_size, - GstRtmpChunkCache * cache) -{ - GstRtmpChunk *chunk; - GstRtmpChunkParseStatus status; - - chunk = gst_rtmp_chunk_new (); - status = chunk_parse (chunk, bytes, chunk_size, cache); - g_object_unref (chunk); - - return status; -} - -GstRtmpChunk * -gst_rtmp_chunk_new_parse (GBytes * bytes, gsize * chunk_size, - GstRtmpChunkCache * cache) -{ - GstRtmpChunk *chunk; - GstRtmpChunkParseStatus status; - - chunk = gst_rtmp_chunk_new (); - status = chunk_parse (chunk, bytes, chunk_size, cache); - GST_ERROR ("status %d", status); - if (status == GST_RTMP_CHUNK_PARSE_OK) - return chunk; + header->header_size = offset; - g_object_unref (chunk); - return NULL; + return (header->header_size <= size); } GBytes * @@ -294,7 +244,7 @@ gst_rtmp_chunk_serialize (GstRtmpChunk * chunk, GstRtmpChunkCache * cache) cached = gst_rtmp_chunk_cache_get (cache, chunk->stream_id); if (cached) { header_fmt = 1; - timestamp = chunk->timestamp - cached->timestamp; + timestamp = chunk->timestamp - cached->previous_header.timestamp; } g_assert (chunk->stream_id < 64); @@ -389,10 +339,13 @@ gst_rtmp_chunk_cache_get (GstRtmpChunkCache * cache, int stream_id) GstRtmpChunkCacheEntry *entry; for (i = 0; i < cache->len; i++) { entry = &g_array_index (cache, GstRtmpChunkCacheEntry, i); - if (entry->stream_id == stream_id) + if (entry->previous_header.stream_id == stream_id) return entry; } - return NULL; + g_array_set_size (cache, cache->len + 1); + entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1); + entry->previous_header.stream_id = stream_id; + return entry; } void @@ -403,10 +356,10 @@ gst_rtmp_chunk_cache_update (GstRtmpChunkCache * cache, GstRtmpChunk * chunk) if (entry == NULL) { g_array_set_size (cache, cache->len + 1); entry = &g_array_index (cache, GstRtmpChunkCacheEntry, cache->len - 1); - entry->stream_id = chunk->stream_id; + entry->previous_header.stream_id = chunk->stream_id; } - entry->timestamp = chunk->timestamp; - entry->message_length = chunk->message_length; - entry->message_type_id = chunk->message_type_id; - entry->info = chunk->info; + entry->previous_header.timestamp = chunk->timestamp; + entry->previous_header.message_length = chunk->message_length; + entry->previous_header.message_type_id = chunk->message_type_id; + entry->previous_header.info = chunk->info; } diff --git a/rtmp/rtmpchunk.h b/rtmp/rtmpchunk.h index 267e871..4834e35 100644 --- a/rtmp/rtmpchunk.h +++ b/rtmp/rtmpchunk.h @@ -34,7 +34,11 @@ typedef struct _GstRtmpChunk GstRtmpChunk; typedef struct _GstRtmpChunkClass GstRtmpChunkClass; typedef GArray GstRtmpChunkCache; typedef struct _GstRtmpChunkCacheEntry GstRtmpChunkCacheEntry; -struct _GstRtmpChunkCacheEntry { +typedef struct _GstRtmpChunkHeader GstRtmpChunkHeader; + +struct _GstRtmpChunkHeader { + int format; + gsize header_size; guint32 stream_id; guint32 timestamp; gsize message_length; @@ -42,6 +46,13 @@ struct _GstRtmpChunkCacheEntry { guint32 info; }; +struct _GstRtmpChunkCacheEntry { + GstRtmpChunkHeader previous_header; + GstRtmpChunk *chunk; + guint8 *payload; + gsize offset; +}; + struct _GstRtmpChunk { GObject object; @@ -86,6 +97,11 @@ guint32 gst_rtmp_chunk_get_stream_id (GstRtmpChunk *chunk); guint32 gst_rtmp_chunk_get_timestamp (GstRtmpChunk *chunk); GBytes * gst_rtmp_chunk_get_payload (GstRtmpChunk *chunk); +gboolean gst_rtmp_chunk_parse_header1 (GstRtmpChunkHeader *header, GBytes * bytes); +gboolean gst_rtmp_chunk_parse_header2 (GstRtmpChunkHeader *header, GBytes * bytes, + GstRtmpChunkHeader *previous_header); + + /* chunk cache */ GstRtmpChunkCache *gst_rtmp_chunk_cache_new (void); diff --git a/rtmp/rtmpconnection.c b/rtmp/rtmpconnection.c index 6e37caa..e8dc6ae 100644 --- a/rtmp/rtmpconnection.c +++ b/rtmp/rtmpconnection.c @@ -437,34 +437,88 @@ gst_rtmp_connection_server_handshake2 (GstRtmpConnection * sc) static void gst_rtmp_connection_chunk_callback (GstRtmpConnection * sc) { - GstRtmpChunk *chunk; + gsize needed_bytes = 0; gsize size = 0; while (1) { + GstRtmpChunkHeader header = { 0 }; + GstRtmpChunkCacheEntry *entry; GBytes *bytes; + gboolean ret; + gsize remaining_bytes; + gsize chunk_bytes; + const guint8 *data; if (sc->input_bytes == NULL) break; - chunk = gst_rtmp_chunk_new_parse (sc->input_bytes, &size, - sc->input_chunk_cache); - if (chunk == NULL) + ret = gst_rtmp_chunk_parse_header1 (&header, sc->input_bytes); + if (!ret) { + needed_bytes = header.header_size; + break; + } + + entry = gst_rtmp_chunk_cache_get (sc->input_chunk_cache, header.stream_id); + + if (entry->chunk && header.format != 3) { + GST_ERROR ("expected message continuation, got new message"); + } + + ret = gst_rtmp_chunk_parse_header2 (&header, sc->input_bytes, + &entry->previous_header); + if (!ret) { + needed_bytes = header.header_size; break; + } - gst_rtmp_chunk_cache_update (sc->input_chunk_cache, chunk); + remaining_bytes = header.message_length - entry->offset; + chunk_bytes = MIN (remaining_bytes, 128); + data = g_bytes_get_data (sc->input_bytes, &size); - bytes = gst_rtmp_connection_take_input_bytes (sc, size); + if (header.header_size + chunk_bytes > size) { + needed_bytes = header.header_size + chunk_bytes; + break; + } + + if (entry->chunk == NULL) { + entry->chunk = gst_rtmp_chunk_new (); + entry->chunk->stream_id = header.stream_id; + entry->chunk->timestamp = header.timestamp; + entry->chunk->message_length = header.message_length; + entry->chunk->message_type_id = header.message_type_id; + entry->chunk->info = header.info; + entry->payload = g_malloc (header.message_length); + } + memcpy (&entry->previous_header, &header, sizeof (header)); + + memcpy (entry->payload + entry->offset, data + header.header_size, + chunk_bytes); + entry->offset += chunk_bytes; + + bytes = gst_rtmp_connection_take_input_bytes (sc, + header.header_size + chunk_bytes); g_bytes_unref (bytes); - GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", chunk->message_length); - g_signal_emit_by_name (sc, "got-chunk", chunk); - g_object_unref (chunk); + if (entry->offset == header.message_length) { + entry->chunk->payload = g_bytes_new_take (entry->payload, + header.message_length); + entry->payload = NULL; - size = 0; + gst_rtmp_dump_data (entry->chunk->payload); + + GST_ERROR ("got chunk: %" G_GSIZE_FORMAT " bytes", + entry->chunk->message_length); + g_signal_emit_by_name (sc, "got-chunk", entry->chunk); + g_object_unref (entry->chunk); + + entry->chunk = NULL; + entry->offset = 0; + } } - GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT, size); + GST_ERROR ("setting needed bytes to %" G_GSIZE_FORMAT ", have %" + G_GSIZE_FORMAT, needed_bytes, size); gst_rtmp_connection_set_input_callback (sc, - gst_rtmp_connection_chunk_callback, size); + gst_rtmp_connection_chunk_callback, needed_bytes); } void @@ -625,10 +679,10 @@ gst_rtmp_connection_client_handshake2_done (GObject * obj, GST_ERROR ("spare bytes after handshake: %" G_GSIZE_FORMAT, g_bytes_get_size (sc->input_bytes)); gst_rtmp_connection_chunk_callback (sc); + } else { + gst_rtmp_connection_set_input_callback (sc, + gst_rtmp_connection_chunk_callback, 0); } - - gst_rtmp_connection_set_input_callback (sc, - gst_rtmp_connection_chunk_callback, 0); gst_rtmp_connection_start_output (sc); } |