diff options
author | Benjamin Otte <otte@gnome.org> | 2008-12-08 19:58:35 +0100 |
---|---|---|
committer | Benjamin Otte <otte@gnome.org> | 2008-12-16 15:16:38 +0100 |
commit | 75bd1b2ee87e14b106b74c64f06fad7063b215fc (patch) | |
tree | 2a43efd57a726c35e1fe828c6d468b0225cef8b2 | |
parent | c07252c9beb8ef750c3d3ea62882da3a4b26aa90 (diff) |
first step in (again) reworking the RTMP handling
-rw-r--r-- | swfdec/Makefile.am | 2 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_connection.c | 68 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_connection.h | 8 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_header.c | 4 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_packet.c | 11 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_packet.h | 1 | ||||
-rw-r--r-- | swfdec/swfdec_rtmp_socket.c | 69 |
7 files changed, 133 insertions, 30 deletions
diff --git a/swfdec/Makefile.am b/swfdec/Makefile.am index 5a0e33b6..38d49907 100644 --- a/swfdec/Makefile.am +++ b/swfdec/Makefile.am @@ -140,6 +140,7 @@ libswfdec_source_files = \ swfdec_rtmp_rpc_channel.c \ swfdec_rtmp_socket.c \ swfdec_rtmp_socket_rtmp.c \ + swfdec_rtmp_stream.c \ swfdec_rtmp_video_channel.c \ swfdec_sandbox.c \ swfdec_script.c \ @@ -340,6 +341,7 @@ noinst_HEADERS = \ swfdec_rtmp_rpc_channel.h \ swfdec_rtmp_socket.h \ swfdec_rtmp_socket_rtmp.h \ + swfdec_rtmp_stream.h \ swfdec_rtmp_video_channel.h \ swfdec_sandbox.h \ swfdec_script_internal.h \ diff --git a/swfdec/swfdec_rtmp_connection.c b/swfdec/swfdec_rtmp_connection.c index 300842b4..e5042de3 100644 --- a/swfdec/swfdec_rtmp_connection.c +++ b/swfdec/swfdec_rtmp_connection.c @@ -32,10 +32,36 @@ #include "swfdec_rtmp_handshake_channel.h" #include "swfdec_rtmp_rpc_channel.h" #include "swfdec_rtmp_socket.h" +#include "swfdec_rtmp_stream.h" + +/*** SwfdecRtmpStream ***/ + +static void +swfdec_rtmp_connection_rtmp_stream_receive (SwfdecRtmpStream *stream, + const SwfdecRtmpHeader *header, SwfdecBuffer *buffer) +{ + SwfdecRtmpConnection *conn = SWFDEC_RTMP_CONNECTION (stream); + SwfdecRtmpChannel *channel = swfdec_rtmp_connection_get_channel (conn, header->channel); + SwfdecRtmpChannelClass *klass; + + if (channel == NULL) { + SWFDEC_FIXME ("woot, no channel %u", header->channel); + return; + } + klass = SWFDEC_RTMP_CHANNEL_GET_CLASS (channel); + klass->receive (channel, header, buffer); +} + +static void +swfdec_rtmp_connection_rtmp_stream_init (SwfdecRtmpStreamInterface *iface) +{ + iface->receive = swfdec_rtmp_connection_rtmp_stream_receive; +} /*** SwfdecRtmpConnection ***/ -G_DEFINE_TYPE (SwfdecRtmpConnection, swfdec_rtmp_connection, SWFDEC_TYPE_AS_RELAY) +G_DEFINE_TYPE_WITH_CODE (SwfdecRtmpConnection, swfdec_rtmp_connection, SWFDEC_TYPE_AS_RELAY, + G_IMPLEMENT_INTERFACE (SWFDEC_TYPE_RTMP_STREAM, swfdec_rtmp_connection_rtmp_stream_init)) static void swfdec_rtmp_connection_mark (SwfdecGcObject *object) @@ -64,6 +90,15 @@ swfdec_rtmp_connection_dispose (GObject *object) g_free (conn->error); conn->error = NULL; + if (conn->incoming) { + g_hash_table_destroy (conn->incoming); + conn->incoming = NULL; + } + if (conn->streams) { + g_hash_table_destroy (conn->streams); + conn->streams = NULL; + } + G_OBJECT_CLASS (swfdec_rtmp_connection_parent_class)->dispose (object); } @@ -81,8 +116,14 @@ swfdec_rtmp_connection_class_init (SwfdecRtmpConnectionClass *klass) static void swfdec_rtmp_connection_init (SwfdecRtmpConnection *conn) { + conn->incoming = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify) swfdec_rtmp_packet_free); + conn->streams = g_hash_table_new (g_direct_hash, g_direct_equal); + conn->read_size = SWFDEC_RTMP_BLOCK_SIZE; conn->write_size = SWFDEC_RTMP_BLOCK_SIZE; + + swfdec_rtmp_register_stream (conn, 0, SWFDEC_RTMP_STREAM (conn)); } void @@ -197,3 +238,28 @@ swfdec_rtmp_connection_get_channel (SwfdecRtmpConnection *conn, guint id) return NULL; } +void +swfdec_rtmp_register_stream (SwfdecRtmpConnection *conn, + guint id, SwfdecRtmpStream *stream) +{ + g_return_if_fail (SWFDEC_IS_RTMP_CONNECTION (conn)); + g_return_if_fail (SWFDEC_IS_RTMP_STREAM (stream)); + + if (g_hash_table_lookup (conn->streams, GUINT_TO_POINTER (id))) { + SWFDEC_FIXME ("stream %u is already registered, ignoring new request", + id); + return; + } + + g_hash_table_insert (conn->streams, GUINT_TO_POINTER (id), stream); +} + +void +swfdec_rtmp_unregister_stream (SwfdecRtmpConnection *conn, guint id) +{ + g_return_if_fail (SWFDEC_IS_RTMP_CONNECTION (conn)); + + if (!g_hash_table_remove (conn->streams, GUINT_TO_POINTER (id))) { + g_assert_not_reached (); + } +} diff --git a/swfdec/swfdec_rtmp_connection.h b/swfdec/swfdec_rtmp_connection.h index f2944175..eb379589 100644 --- a/swfdec/swfdec_rtmp_connection.h +++ b/swfdec/swfdec_rtmp_connection.h @@ -56,6 +56,8 @@ struct _SwfdecRtmpConnection { GList * last_send; /* list entry of last channel sent to */ SwfdecRtmpChannel * handshake; /* channel used for doing initial handshake or NULL */ char * error; /* NULL or debug string for error message */ + GHashTable * incoming; /* channel id => incoming packets */ + GHashTable * streams; /* stream id => stream */ guint read_size; /* size of a block of data when reading */ guint write_size; /* size of a block of data when writing */ @@ -85,6 +87,12 @@ void swfdec_rtmp_connection_errorv (SwfdecRtmpConnection * conn, void swfdec_rtmp_connection_on_status (SwfdecRtmpConnection * conn, SwfdecAsValue value); +void swfdec_rtmp_register_stream (SwfdecRtmpConnection * conn, + guint id, + SwfdecRtmpStream * stream); +void swfdec_rtmp_unregister_stream (SwfdecRtmpConnection * conn, + guint id); + #define swfdec_rtmp_connection_get_handshake_channel(conn) ((conn)->handshake) #define swfdec_rtmp_connection_get_command_channel(conn) (swfdec_rtmp_connection_get_channel (conn, 2)) #define swfdec_rtmp_connection_get_rpc_channel(conn) (swfdec_rtmp_connection_get_channel (conn, 3)) diff --git a/swfdec/swfdec_rtmp_header.c b/swfdec/swfdec_rtmp_header.c index cb3f6bfc..b0836b23 100644 --- a/swfdec/swfdec_rtmp_header.c +++ b/swfdec/swfdec_rtmp_header.c @@ -23,6 +23,8 @@ #include "swfdec_rtmp_header.h" +#include <string.h> + gsize swfdec_rtmp_header_peek_size (guint first_byte) { @@ -51,7 +53,7 @@ swfdec_rtmp_header_invalidate (SwfdecRtmpHeader *header) { g_return_if_fail (header != NULL); - header->channel = (guint) -1; + memset (header, 0, sizeof (SwfdecRtmpHeader)); } void diff --git a/swfdec/swfdec_rtmp_packet.c b/swfdec/swfdec_rtmp_packet.c index 58a0a9ea..ae3d1332 100644 --- a/swfdec/swfdec_rtmp_packet.c +++ b/swfdec/swfdec_rtmp_packet.c @@ -24,6 +24,12 @@ #include "swfdec_rtmp_packet.h" SwfdecRtmpPacket * +swfdec_rtmp_packet_new_empty (void) +{ + return g_slice_new0 (SwfdecRtmpPacket); +} + +SwfdecRtmpPacket * swfdec_rtmp_packet_new (SwfdecRtmpPacketType type, guint timestamp, SwfdecBuffer *buffer) { @@ -31,7 +37,7 @@ swfdec_rtmp_packet_new (SwfdecRtmpPacketType type, guint timestamp, g_return_val_if_fail (buffer != NULL, NULL); - packet = g_slice_new0 (SwfdecRtmpPacket); + packet = swfdec_rtmp_packet_new_empty (); packet->header.type = type; packet->header.timestamp = timestamp; packet->buffer = swfdec_buffer_ref (buffer); @@ -44,7 +50,8 @@ swfdec_rtmp_packet_free (SwfdecRtmpPacket *packet) { g_return_if_fail (packet != NULL); - swfdec_buffer_unref (packet->buffer); + if (packet->buffer) + swfdec_buffer_unref (packet->buffer); g_slice_free (SwfdecRtmpPacket, packet); } diff --git a/swfdec/swfdec_rtmp_packet.h b/swfdec/swfdec_rtmp_packet.h index 3f83ca71..96482f04 100644 --- a/swfdec/swfdec_rtmp_packet.h +++ b/swfdec/swfdec_rtmp_packet.h @@ -33,6 +33,7 @@ struct _SwfdecRtmpPacket { SwfdecBuffer * buffer; /* contents of packet */ }; +SwfdecRtmpPacket * swfdec_rtmp_packet_new_empty (void); SwfdecRtmpPacket * swfdec_rtmp_packet_new (SwfdecRtmpPacketType type, guint timestamp, SwfdecBuffer * buffer); diff --git a/swfdec/swfdec_rtmp_socket.c b/swfdec/swfdec_rtmp_socket.c index 955eb608..5232f4ba 100644 --- a/swfdec/swfdec_rtmp_socket.c +++ b/swfdec/swfdec_rtmp_socket.c @@ -23,9 +23,12 @@ #include "swfdec_rtmp_socket.h" +#include <string.h> + #include "swfdec_debug.h" #include "swfdec_rtmp_handshake_channel.h" #include "swfdec_player_internal.h" +#include "swfdec_rtmp_stream.h" /* socket implementations for swfdec_rtmp_socket_new() */ #include "swfdec_rtmp_socket_rtmp.h" @@ -147,7 +150,7 @@ void swfdec_rtmp_socket_receive (SwfdecRtmpSocket *sock, SwfdecBufferQueue *queue) { SwfdecRtmpConnection *conn; - SwfdecRtmpChannel *channel; + SwfdecRtmpPacket *packet; SwfdecRtmpHeader header; SwfdecBuffer *buffer; SwfdecBits bits; @@ -181,43 +184,57 @@ swfdec_rtmp_socket_receive (SwfdecRtmpSocket *sock, SwfdecBufferQueue *queue) break; swfdec_bits_init (&bits, buffer); i = swfdec_rtmp_header_peek_channel (&bits); - channel = swfdec_rtmp_connection_get_channel (conn, i); - if (channel == NULL) { - swfdec_rtmp_connection_error (conn, - "message on unknown channel %u, what now?", i); - return; + packet = g_hash_table_lookup (conn->incoming, GUINT_TO_POINTER (i)); + if (packet) { + swfdec_rtmp_header_copy (&header, &packet->header); + } else { + swfdec_rtmp_header_invalidate (&header); } - if (header_size >= 4 && swfdec_buffer_queue_get_depth (channel->recv_queue)) { - SWFDEC_ERROR ("not a continuation header, but old command not finished yet, dropping old command"); - swfdec_buffer_queue_flush (channel->recv_queue, swfdec_buffer_queue_get_depth (channel->recv_queue)); - } - swfdec_rtmp_header_copy (&header, &channel->recv_cache); swfdec_rtmp_header_read (&header, &bits); swfdec_buffer_unref (buffer); /* read the data chunk */ - remaining = header.size - swfdec_buffer_queue_get_depth (channel->recv_queue); + remaining = header.size; + if (packet && packet->buffer) + remaining -= packet->buffer->length; remaining = MIN (remaining, conn->read_size); if (header_size + remaining > swfdec_buffer_queue_get_depth (queue)) return; + + if (packet == NULL) { + packet = swfdec_rtmp_packet_new_empty (); + g_hash_table_insert (conn->incoming, GUINT_TO_POINTER (i), packet); + } else if (header_size >= 4 && packet->buffer != NULL) { + SWFDEC_ERROR ("not a continuation header, but old command not finished yet, dropping old command"); + swfdec_buffer_unref (packet->buffer); + packet->buffer = NULL; + } + if (packet->buffer == NULL) { + packet->buffer = swfdec_buffer_new (header.size); + /* we store the actual size of the buffer in packet->header.size, and + * use length to count how much data we already received */ + packet->buffer->length = 0; + } + swfdec_rtmp_header_copy (&packet->header, &header); + swfdec_buffer_queue_flush (queue, header_size); buffer = swfdec_buffer_queue_pull (queue, remaining); g_assert (buffer); - swfdec_buffer_queue_push (channel->recv_queue, buffer); - swfdec_rtmp_header_copy (&channel->recv_cache, &header); - - /* process the buffer if it's received completely */ - buffer = swfdec_buffer_queue_pull (channel->recv_queue, header.size); - if (buffer) { - SwfdecRtmpChannelClass *klass = SWFDEC_RTMP_CHANNEL_GET_CLASS (channel); - - g_assert (swfdec_buffer_queue_get_depth (channel->recv_queue) == 0); - if (header.stream != channel->stream_id) { - SWFDEC_FIXME ("channel has stream id %u, but message has stream id %u, is this bad?", - channel->stream_id, header.stream); + /* we allocate the buffer so it's big enough */ + memcpy (packet->buffer->data + packet->buffer->length, buffer->data, remaining); + packet->buffer->length += remaining; + swfdec_buffer_unref (buffer); + + if (packet->buffer->length == header.size) { + SwfdecRtmpStream *stream = g_hash_table_lookup (conn->streams, GUINT_TO_POINTER (header.stream)); + + if (stream) { + swfdec_rtmp_stream_receive (stream, packet); + } else { + SWFDEC_FIXME ("packet (type %u) for unknown stream %u", header.type, header.stream); } - klass->receive (channel, &header, buffer); - swfdec_buffer_unref (buffer); + swfdec_buffer_unref (packet->buffer); + packet->buffer = NULL; } } while (TRUE); } |