summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Otte <otte@gnome.org>2008-12-08 19:58:35 +0100
committerBenjamin Otte <otte@gnome.org>2008-12-16 15:16:38 +0100
commit75bd1b2ee87e14b106b74c64f06fad7063b215fc (patch)
tree2a43efd57a726c35e1fe828c6d468b0225cef8b2
parentc07252c9beb8ef750c3d3ea62882da3a4b26aa90 (diff)
first step in (again) reworking the RTMP handling
-rw-r--r--swfdec/Makefile.am2
-rw-r--r--swfdec/swfdec_rtmp_connection.c68
-rw-r--r--swfdec/swfdec_rtmp_connection.h8
-rw-r--r--swfdec/swfdec_rtmp_header.c4
-rw-r--r--swfdec/swfdec_rtmp_packet.c11
-rw-r--r--swfdec/swfdec_rtmp_packet.h1
-rw-r--r--swfdec/swfdec_rtmp_socket.c69
7 files changed, 133 insertions, 30 deletions
diff --git a/swfdec/Makefile.am b/swfdec/Makefile.am
index 5a0e33b6..38d49907 100644
--- a/swfdec/Makefile.am
+++ b/swfdec/Makefile.am
@@ -140,6 +140,7 @@ libswfdec_source_files = \
swfdec_rtmp_rpc_channel.c \
swfdec_rtmp_socket.c \
swfdec_rtmp_socket_rtmp.c \
+ swfdec_rtmp_stream.c \
swfdec_rtmp_video_channel.c \
swfdec_sandbox.c \
swfdec_script.c \
@@ -340,6 +341,7 @@ noinst_HEADERS = \
swfdec_rtmp_rpc_channel.h \
swfdec_rtmp_socket.h \
swfdec_rtmp_socket_rtmp.h \
+ swfdec_rtmp_stream.h \
swfdec_rtmp_video_channel.h \
swfdec_sandbox.h \
swfdec_script_internal.h \
diff --git a/swfdec/swfdec_rtmp_connection.c b/swfdec/swfdec_rtmp_connection.c
index 300842b4..e5042de3 100644
--- a/swfdec/swfdec_rtmp_connection.c
+++ b/swfdec/swfdec_rtmp_connection.c
@@ -32,10 +32,36 @@
#include "swfdec_rtmp_handshake_channel.h"
#include "swfdec_rtmp_rpc_channel.h"
#include "swfdec_rtmp_socket.h"
+#include "swfdec_rtmp_stream.h"
+
+/*** SwfdecRtmpStream ***/
+
+static void
+swfdec_rtmp_connection_rtmp_stream_receive (SwfdecRtmpStream *stream,
+ const SwfdecRtmpHeader *header, SwfdecBuffer *buffer)
+{
+ SwfdecRtmpConnection *conn = SWFDEC_RTMP_CONNECTION (stream);
+ SwfdecRtmpChannel *channel = swfdec_rtmp_connection_get_channel (conn, header->channel);
+ SwfdecRtmpChannelClass *klass;
+
+ if (channel == NULL) {
+ SWFDEC_FIXME ("woot, no channel %u", header->channel);
+ return;
+ }
+ klass = SWFDEC_RTMP_CHANNEL_GET_CLASS (channel);
+ klass->receive (channel, header, buffer);
+}
+
+static void
+swfdec_rtmp_connection_rtmp_stream_init (SwfdecRtmpStreamInterface *iface)
+{
+ iface->receive = swfdec_rtmp_connection_rtmp_stream_receive;
+}
/*** SwfdecRtmpConnection ***/
-G_DEFINE_TYPE (SwfdecRtmpConnection, swfdec_rtmp_connection, SWFDEC_TYPE_AS_RELAY)
+G_DEFINE_TYPE_WITH_CODE (SwfdecRtmpConnection, swfdec_rtmp_connection, SWFDEC_TYPE_AS_RELAY,
+ G_IMPLEMENT_INTERFACE (SWFDEC_TYPE_RTMP_STREAM, swfdec_rtmp_connection_rtmp_stream_init))
static void
swfdec_rtmp_connection_mark (SwfdecGcObject *object)
@@ -64,6 +90,15 @@ swfdec_rtmp_connection_dispose (GObject *object)
g_free (conn->error);
conn->error = NULL;
+ if (conn->incoming) {
+ g_hash_table_destroy (conn->incoming);
+ conn->incoming = NULL;
+ }
+ if (conn->streams) {
+ g_hash_table_destroy (conn->streams);
+ conn->streams = NULL;
+ }
+
G_OBJECT_CLASS (swfdec_rtmp_connection_parent_class)->dispose (object);
}
@@ -81,8 +116,14 @@ swfdec_rtmp_connection_class_init (SwfdecRtmpConnectionClass *klass)
static void
swfdec_rtmp_connection_init (SwfdecRtmpConnection *conn)
{
+ conn->incoming = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, (GDestroyNotify) swfdec_rtmp_packet_free);
+ conn->streams = g_hash_table_new (g_direct_hash, g_direct_equal);
+
conn->read_size = SWFDEC_RTMP_BLOCK_SIZE;
conn->write_size = SWFDEC_RTMP_BLOCK_SIZE;
+
+ swfdec_rtmp_register_stream (conn, 0, SWFDEC_RTMP_STREAM (conn));
}
void
@@ -197,3 +238,28 @@ swfdec_rtmp_connection_get_channel (SwfdecRtmpConnection *conn, guint id)
return NULL;
}
+void
+swfdec_rtmp_register_stream (SwfdecRtmpConnection *conn,
+ guint id, SwfdecRtmpStream *stream)
+{
+ g_return_if_fail (SWFDEC_IS_RTMP_CONNECTION (conn));
+ g_return_if_fail (SWFDEC_IS_RTMP_STREAM (stream));
+
+ if (g_hash_table_lookup (conn->streams, GUINT_TO_POINTER (id))) {
+ SWFDEC_FIXME ("stream %u is already registered, ignoring new request",
+ id);
+ return;
+ }
+
+ g_hash_table_insert (conn->streams, GUINT_TO_POINTER (id), stream);
+}
+
+void
+swfdec_rtmp_unregister_stream (SwfdecRtmpConnection *conn, guint id)
+{
+ g_return_if_fail (SWFDEC_IS_RTMP_CONNECTION (conn));
+
+ if (!g_hash_table_remove (conn->streams, GUINT_TO_POINTER (id))) {
+ g_assert_not_reached ();
+ }
+}
diff --git a/swfdec/swfdec_rtmp_connection.h b/swfdec/swfdec_rtmp_connection.h
index f2944175..eb379589 100644
--- a/swfdec/swfdec_rtmp_connection.h
+++ b/swfdec/swfdec_rtmp_connection.h
@@ -56,6 +56,8 @@ struct _SwfdecRtmpConnection {
GList * last_send; /* list entry of last channel sent to */
SwfdecRtmpChannel * handshake; /* channel used for doing initial handshake or NULL */
char * error; /* NULL or debug string for error message */
+ GHashTable * incoming; /* channel id => incoming packets */
+ GHashTable * streams; /* stream id => stream */
guint read_size; /* size of a block of data when reading */
guint write_size; /* size of a block of data when writing */
@@ -85,6 +87,12 @@ void swfdec_rtmp_connection_errorv (SwfdecRtmpConnection * conn,
void swfdec_rtmp_connection_on_status (SwfdecRtmpConnection * conn,
SwfdecAsValue value);
+void swfdec_rtmp_register_stream (SwfdecRtmpConnection * conn,
+ guint id,
+ SwfdecRtmpStream * stream);
+void swfdec_rtmp_unregister_stream (SwfdecRtmpConnection * conn,
+ guint id);
+
#define swfdec_rtmp_connection_get_handshake_channel(conn) ((conn)->handshake)
#define swfdec_rtmp_connection_get_command_channel(conn) (swfdec_rtmp_connection_get_channel (conn, 2))
#define swfdec_rtmp_connection_get_rpc_channel(conn) (swfdec_rtmp_connection_get_channel (conn, 3))
diff --git a/swfdec/swfdec_rtmp_header.c b/swfdec/swfdec_rtmp_header.c
index cb3f6bfc..b0836b23 100644
--- a/swfdec/swfdec_rtmp_header.c
+++ b/swfdec/swfdec_rtmp_header.c
@@ -23,6 +23,8 @@
#include "swfdec_rtmp_header.h"
+#include <string.h>
+
gsize
swfdec_rtmp_header_peek_size (guint first_byte)
{
@@ -51,7 +53,7 @@ swfdec_rtmp_header_invalidate (SwfdecRtmpHeader *header)
{
g_return_if_fail (header != NULL);
- header->channel = (guint) -1;
+ memset (header, 0, sizeof (SwfdecRtmpHeader));
}
void
diff --git a/swfdec/swfdec_rtmp_packet.c b/swfdec/swfdec_rtmp_packet.c
index 58a0a9ea..ae3d1332 100644
--- a/swfdec/swfdec_rtmp_packet.c
+++ b/swfdec/swfdec_rtmp_packet.c
@@ -24,6 +24,12 @@
#include "swfdec_rtmp_packet.h"
SwfdecRtmpPacket *
+swfdec_rtmp_packet_new_empty (void)
+{
+ return g_slice_new0 (SwfdecRtmpPacket);
+}
+
+SwfdecRtmpPacket *
swfdec_rtmp_packet_new (SwfdecRtmpPacketType type, guint timestamp,
SwfdecBuffer *buffer)
{
@@ -31,7 +37,7 @@ swfdec_rtmp_packet_new (SwfdecRtmpPacketType type, guint timestamp,
g_return_val_if_fail (buffer != NULL, NULL);
- packet = g_slice_new0 (SwfdecRtmpPacket);
+ packet = swfdec_rtmp_packet_new_empty ();
packet->header.type = type;
packet->header.timestamp = timestamp;
packet->buffer = swfdec_buffer_ref (buffer);
@@ -44,7 +50,8 @@ swfdec_rtmp_packet_free (SwfdecRtmpPacket *packet)
{
g_return_if_fail (packet != NULL);
- swfdec_buffer_unref (packet->buffer);
+ if (packet->buffer)
+ swfdec_buffer_unref (packet->buffer);
g_slice_free (SwfdecRtmpPacket, packet);
}
diff --git a/swfdec/swfdec_rtmp_packet.h b/swfdec/swfdec_rtmp_packet.h
index 3f83ca71..96482f04 100644
--- a/swfdec/swfdec_rtmp_packet.h
+++ b/swfdec/swfdec_rtmp_packet.h
@@ -33,6 +33,7 @@ struct _SwfdecRtmpPacket {
SwfdecBuffer * buffer; /* contents of packet */
};
+SwfdecRtmpPacket * swfdec_rtmp_packet_new_empty (void);
SwfdecRtmpPacket * swfdec_rtmp_packet_new (SwfdecRtmpPacketType type,
guint timestamp,
SwfdecBuffer * buffer);
diff --git a/swfdec/swfdec_rtmp_socket.c b/swfdec/swfdec_rtmp_socket.c
index 955eb608..5232f4ba 100644
--- a/swfdec/swfdec_rtmp_socket.c
+++ b/swfdec/swfdec_rtmp_socket.c
@@ -23,9 +23,12 @@
#include "swfdec_rtmp_socket.h"
+#include <string.h>
+
#include "swfdec_debug.h"
#include "swfdec_rtmp_handshake_channel.h"
#include "swfdec_player_internal.h"
+#include "swfdec_rtmp_stream.h"
/* socket implementations for swfdec_rtmp_socket_new() */
#include "swfdec_rtmp_socket_rtmp.h"
@@ -147,7 +150,7 @@ void
swfdec_rtmp_socket_receive (SwfdecRtmpSocket *sock, SwfdecBufferQueue *queue)
{
SwfdecRtmpConnection *conn;
- SwfdecRtmpChannel *channel;
+ SwfdecRtmpPacket *packet;
SwfdecRtmpHeader header;
SwfdecBuffer *buffer;
SwfdecBits bits;
@@ -181,43 +184,57 @@ swfdec_rtmp_socket_receive (SwfdecRtmpSocket *sock, SwfdecBufferQueue *queue)
break;
swfdec_bits_init (&bits, buffer);
i = swfdec_rtmp_header_peek_channel (&bits);
- channel = swfdec_rtmp_connection_get_channel (conn, i);
- if (channel == NULL) {
- swfdec_rtmp_connection_error (conn,
- "message on unknown channel %u, what now?", i);
- return;
+ packet = g_hash_table_lookup (conn->incoming, GUINT_TO_POINTER (i));
+ if (packet) {
+ swfdec_rtmp_header_copy (&header, &packet->header);
+ } else {
+ swfdec_rtmp_header_invalidate (&header);
}
- if (header_size >= 4 && swfdec_buffer_queue_get_depth (channel->recv_queue)) {
- SWFDEC_ERROR ("not a continuation header, but old command not finished yet, dropping old command");
- swfdec_buffer_queue_flush (channel->recv_queue, swfdec_buffer_queue_get_depth (channel->recv_queue));
- }
- swfdec_rtmp_header_copy (&header, &channel->recv_cache);
swfdec_rtmp_header_read (&header, &bits);
swfdec_buffer_unref (buffer);
/* read the data chunk */
- remaining = header.size - swfdec_buffer_queue_get_depth (channel->recv_queue);
+ remaining = header.size;
+ if (packet && packet->buffer)
+ remaining -= packet->buffer->length;
remaining = MIN (remaining, conn->read_size);
if (header_size + remaining > swfdec_buffer_queue_get_depth (queue))
return;
+
+ if (packet == NULL) {
+ packet = swfdec_rtmp_packet_new_empty ();
+ g_hash_table_insert (conn->incoming, GUINT_TO_POINTER (i), packet);
+ } else if (header_size >= 4 && packet->buffer != NULL) {
+ SWFDEC_ERROR ("not a continuation header, but old command not finished yet, dropping old command");
+ swfdec_buffer_unref (packet->buffer);
+ packet->buffer = NULL;
+ }
+ if (packet->buffer == NULL) {
+ packet->buffer = swfdec_buffer_new (header.size);
+ /* we store the actual size of the buffer in packet->header.size, and
+ * use length to count how much data we already received */
+ packet->buffer->length = 0;
+ }
+ swfdec_rtmp_header_copy (&packet->header, &header);
+
swfdec_buffer_queue_flush (queue, header_size);
buffer = swfdec_buffer_queue_pull (queue, remaining);
g_assert (buffer);
- swfdec_buffer_queue_push (channel->recv_queue, buffer);
- swfdec_rtmp_header_copy (&channel->recv_cache, &header);
-
- /* process the buffer if it's received completely */
- buffer = swfdec_buffer_queue_pull (channel->recv_queue, header.size);
- if (buffer) {
- SwfdecRtmpChannelClass *klass = SWFDEC_RTMP_CHANNEL_GET_CLASS (channel);
-
- g_assert (swfdec_buffer_queue_get_depth (channel->recv_queue) == 0);
- if (header.stream != channel->stream_id) {
- SWFDEC_FIXME ("channel has stream id %u, but message has stream id %u, is this bad?",
- channel->stream_id, header.stream);
+ /* we allocate the buffer so it's big enough */
+ memcpy (packet->buffer->data + packet->buffer->length, buffer->data, remaining);
+ packet->buffer->length += remaining;
+ swfdec_buffer_unref (buffer);
+
+ if (packet->buffer->length == header.size) {
+ SwfdecRtmpStream *stream = g_hash_table_lookup (conn->streams, GUINT_TO_POINTER (header.stream));
+
+ if (stream) {
+ swfdec_rtmp_stream_receive (stream, packet);
+ } else {
+ SWFDEC_FIXME ("packet (type %u) for unknown stream %u", header.type, header.stream);
}
- klass->receive (channel, &header, buffer);
- swfdec_buffer_unref (buffer);
+ swfdec_buffer_unref (packet->buffer);
+ packet->buffer = NULL;
}
} while (TRUE);
}