summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Otte <otte@gnome.org>2008-11-21 11:07:25 +0100
committerBenjamin Otte <otte@gnome.org>2008-11-21 11:07:25 +0100
commitdec715d31f54dad1557e7142ed5debed82b78c90 (patch)
tree2b16af9a5e8807f515c70f9bce19110b44a14080
parent3deb6222cb5fc092e9838c598592e019aaa4b494 (diff)
redo socket writing API
previously the SwfdecSocket was responsible for keeping a queue of data to be written. Now the queue is supposed to be kept inside Swfdec. This has 2 advantages: a) The API maps nicely to async I/O, the code in external apps can be reduced. b) The RTMP code can assemble data on demand and does not have to queue all of the data in random order inside the socket. This will liekly only effect the written data on clogged networks.
-rw-r--r--swfdec-gtk/swfdec_gtk_socket.c66
-rw-r--r--swfdec/swfdec_loader_internal.h2
-rw-r--r--swfdec/swfdec_socket.c29
-rw-r--r--swfdec/swfdec_socket.h4
-rw-r--r--swfdec/swfdec_stream.c33
-rw-r--r--swfdec/swfdec_stream_target.c15
-rw-r--r--swfdec/swfdec_stream_target.h4
-rw-r--r--swfdec/swfdec_xml_socket.c41
-rw-r--r--swfdec/swfdec_xml_socket.h1
9 files changed, 136 insertions, 59 deletions
diff --git a/swfdec-gtk/swfdec_gtk_socket.c b/swfdec-gtk/swfdec_gtk_socket.c
index 13b22721..3a902d77 100644
--- a/swfdec-gtk/swfdec_gtk_socket.c
+++ b/swfdec-gtk/swfdec_gtk_socket.c
@@ -49,8 +49,6 @@ struct _SwfdecGtkSocket
SwfdecSocket socket;
SoupSocket * sock; /* libsoup socket we're using */
- gboolean sock_writable; /* FALSE if writing would block */
- SwfdecBufferQueue * queue; /* buffers we still need to push */
};
struct _SwfdecGtkSocketClass {
@@ -110,36 +108,9 @@ swfdec_gtk_socket_do_read (SoupSocket *sock, SwfdecGtkSocket *gtk)
}
static void
-swfdec_gtk_socket_do_write (SoupSocket *sock, SwfdecGtkSocket *gtk)
+swfdec_gtk_socket_writable (SoupSocket *sock, SwfdecGtkSocket *gtk)
{
- SwfdecBuffer *buffer;
- SoupSocketIOStatus status;
- GError *error = NULL;
- gsize len;
-
- gtk->sock_writable = TRUE;
- while ((buffer = swfdec_buffer_queue_peek_buffer (gtk->queue))) {
- status = soup_socket_write (sock, buffer->data, buffer->length,
- &len, NULL, &error);
- swfdec_buffer_unref (buffer);
- switch (status) {
- case SOUP_SOCKET_OK:
- buffer = swfdec_buffer_queue_pull (gtk->queue, len);
- swfdec_buffer_unref (buffer);
- break;
- case SOUP_SOCKET_WOULD_BLOCK:
- case SOUP_SOCKET_EOF:
- gtk->sock_writable = FALSE;
- break;
- case SOUP_SOCKET_ERROR:
- swfdec_stream_error (SWFDEC_STREAM (gtk), "%s", error->message);
- g_error_free (error);
- return;
- default:
- g_warning ("unhandled status code %u from soup_socket_read()", (guint) status);
- break;
- }
- };
+ swfdec_socket_signal_writable (SWFDEC_SOCKET (gtk));
}
static void
@@ -171,18 +142,34 @@ swfdec_gtk_socket_connect (SwfdecSocket *sock_, SwfdecPlayer *player,
g_signal_connect (sock->sock, "readable",
G_CALLBACK (swfdec_gtk_socket_do_read), sock);
g_signal_connect (sock->sock, "writable",
- G_CALLBACK (swfdec_gtk_socket_do_write), sock);
+ G_CALLBACK (swfdec_gtk_socket_writable), sock);
soup_socket_connect_async (sock->sock, NULL, swfdec_gtk_socket_do_connect, sock);
}
-static void
+static gsize
swfdec_gtk_socket_send (SwfdecSocket *sock, SwfdecBuffer *buffer)
{
SwfdecGtkSocket *gtk = SWFDEC_GTK_SOCKET (sock);
+ SoupSocketIOStatus status;
+ GError *error = NULL;
+ gsize len;
- swfdec_buffer_queue_push (gtk->queue, buffer);
- if (gtk->sock_writable)
- swfdec_gtk_socket_do_write (gtk->sock, gtk);
+ status = soup_socket_write (gtk->sock, buffer->data, buffer->length,
+ &len, NULL, &error);
+ switch (status) {
+ case SOUP_SOCKET_OK:
+ case SOUP_SOCKET_WOULD_BLOCK:
+ case SOUP_SOCKET_EOF:
+ break;
+ case SOUP_SOCKET_ERROR:
+ swfdec_stream_error (SWFDEC_STREAM (gtk), "%s", error->message);
+ g_error_free (error);
+ return 0;
+ default:
+ g_warning ("unhandled status code %u from soup_socket_read()", (guint) status);
+ break;
+ }
+ return len;
}
static void
@@ -196,10 +183,7 @@ swfdec_gtk_socket_dispose (GObject *object)
g_object_unref (gtk->sock);
gtk->sock = NULL;
}
- if (gtk->queue) {
- swfdec_buffer_queue_unref (gtk->queue);
- gtk->queue = NULL;
- }
+
G_OBJECT_CLASS (swfdec_gtk_socket_parent_class)->dispose (object);
}
@@ -221,7 +205,5 @@ swfdec_gtk_socket_class_init (SwfdecGtkSocketClass *klass)
static void
swfdec_gtk_socket_init (SwfdecGtkSocket *gtk)
{
- gtk->sock_writable = TRUE;
- gtk->queue = swfdec_buffer_queue_new ();
}
diff --git a/swfdec/swfdec_loader_internal.h b/swfdec/swfdec_loader_internal.h
index 421ac96e..47cdb903 100644
--- a/swfdec/swfdec_loader_internal.h
+++ b/swfdec/swfdec_loader_internal.h
@@ -37,7 +37,7 @@ void swfdec_loader_set_data_type (SwfdecLoader * loader,
SwfdecLoaderDataType type);
/* swfdec_socket.c */
-void swfdec_socket_send (SwfdecSocket * sock,
+gsize swfdec_socket_send (SwfdecSocket * sock,
SwfdecBuffer * buffer);
diff --git a/swfdec/swfdec_socket.c b/swfdec/swfdec_socket.c
index 64de43f7..e180f49d 100644
--- a/swfdec/swfdec_socket.c
+++ b/swfdec/swfdec_socket.c
@@ -55,9 +55,11 @@
* port. If you encounter an error, call swfdec_stream_error(), but
* still make sure the socket object does not break.
* @send: Called to send data down the given socket. This function will only be
- * called when the socket is open. You get passed a reference to the
- * buffer, so it is your responsibility to call swfdec_buffer_unref() on
- * it when you are done with it.
+ * called when the socket is open. The function is supposed to write as
+ * much data as possible to the socket and return the amount of data
+ * written. If not all data could be written, the socket is assumed to
+ * be full and no attempt at writing to it will be made until you call
+ * swfdec_socket_signal_writable().
*
* This is the socket class. When you create a subclass, you need to implement
* the functions listed above.
@@ -74,10 +76,10 @@ swfdec_socket_do_connect (SwfdecSocket *socket, SwfdecPlayer *player,
swfdec_stream_error (SWFDEC_STREAM (socket), "no socket implementation exists");
}
-static void
+static gsize
swfdec_socket_do_send (SwfdecSocket *socket, SwfdecBuffer *buffer)
{
- swfdec_buffer_unref (buffer);
+ return 0;
}
static const char *
@@ -106,20 +108,23 @@ swfdec_socket_init (SwfdecSocket *socket)
/**
* swfdec_socket_send:
* @sock: a #SwfdecSocket
- * @buffer: data to send to the stream
+ * @buffer: data to send to the stream, no reference will be taken.
*
- * Pushes the given @buffer down the stream.
+ * Tries to push the data of @buffer down the stream. If all of the data could
+ * be sent, @buffer->length will be returned. Otherwise the amount of data
+ * written will be returned and when more data can be written, the stream
+ * target's writable vfunc will be called.
**/
-void
+gsize
swfdec_socket_send (SwfdecSocket *sock, SwfdecBuffer *buffer)
{
SwfdecSocketClass *klass;
- g_return_if_fail (SWFDEC_IS_SOCKET (sock));
- g_return_if_fail (swfdec_stream_is_open (SWFDEC_STREAM (sock)));
- g_return_if_fail (buffer != NULL);
+ g_return_val_if_fail (SWFDEC_IS_SOCKET (sock), 0);
+ g_return_val_if_fail (swfdec_stream_is_open (SWFDEC_STREAM (sock)), 0);
+ g_return_val_if_fail (buffer != NULL, 0);
klass = SWFDEC_SOCKET_GET_CLASS (sock);
- klass->send (sock, buffer);
+ return klass->send (sock, buffer);
}
diff --git a/swfdec/swfdec_socket.h b/swfdec/swfdec_socket.h
index 8a4eb2cd..fdc5293b 100644
--- a/swfdec/swfdec_socket.h
+++ b/swfdec/swfdec_socket.h
@@ -52,12 +52,14 @@ struct _SwfdecSocketClass
const char * hostname,
guint port);
- void (* send) (SwfdecSocket * socket,
+ gsize (* send) (SwfdecSocket * socket,
SwfdecBuffer * buffer);
};
GType swfdec_socket_get_type (void);
+void swfdec_socket_signal_writable (SwfdecSocket * sock);
+
G_END_DECLS
#endif
diff --git a/swfdec/swfdec_stream.c b/swfdec/swfdec_stream.c
index c44052fc..ef32bbe7 100644
--- a/swfdec/swfdec_stream.c
+++ b/swfdec/swfdec_stream.c
@@ -470,3 +470,36 @@ swfdec_stream_close (SwfdecStream *stream)
swfdec_stream_queue_processing (stream);
}
+/* FIXME: put in right file */
+static void
+swfdec_socket_process_writable (gpointer streamp, gpointer unused)
+{
+ SwfdecStream *stream = streamp;
+ SwfdecStreamPrivate *priv = stream->priv;
+
+ g_assert (priv->target);
+
+ swfdec_stream_target_writable (priv->target, stream);
+}
+
+/**
+ * swfdec_socket_signal_writable:
+ * @sock: the socket that has become writable
+ *
+ * Signals to Swfdec that it should try writing to the given socket again.
+ **/
+void
+swfdec_socket_signal_writable (SwfdecSocket *sock)
+{
+ SwfdecStreamPrivate *priv;
+
+ g_return_if_fail (SWFDEC_IS_SOCKET (sock));
+
+ priv = SWFDEC_STREAM (sock)->priv;
+ if (priv->target) {
+ g_assert (priv->player);
+ swfdec_player_add_external_action (priv->player, sock,
+ swfdec_socket_process_writable, NULL);
+ }
+}
+
diff --git a/swfdec/swfdec_stream_target.c b/swfdec/swfdec_stream_target.c
index 4b96ef0d..a7799d50 100644
--- a/swfdec/swfdec_stream_target.c
+++ b/swfdec/swfdec_stream_target.c
@@ -139,3 +139,18 @@ swfdec_stream_target_error (SwfdecStreamTarget *target, SwfdecStream *stream)
iface->error (target, stream);
}
+void
+swfdec_stream_target_writable (SwfdecStreamTarget *target, SwfdecStream *stream)
+{
+ SwfdecStreamTargetInterface *iface;
+
+ g_return_if_fail (SWFDEC_IS_STREAM_TARGET (target));
+ g_return_if_fail (SWFDEC_IS_STREAM (stream));
+
+ SWFDEC_LOG ("writable on %s", swfdec_stream_describe (stream));
+
+ iface = SWFDEC_STREAM_TARGET_GET_INTERFACE (target);
+ if (iface->writable)
+ iface->writable (target, stream);
+}
+
diff --git a/swfdec/swfdec_stream_target.h b/swfdec/swfdec_stream_target.h
index 5b50bb2b..546f6c92 100644
--- a/swfdec/swfdec_stream_target.h
+++ b/swfdec/swfdec_stream_target.h
@@ -47,6 +47,8 @@ struct _SwfdecStreamTargetInterface {
SwfdecStream * stream);
void (* error) (SwfdecStreamTarget * target,
SwfdecStream * stream);
+ void (* writable) (SwfdecStreamTarget * target,
+ SwfdecStream * stream);
};
GType swfdec_stream_target_get_type (void) G_GNUC_CONST;
@@ -60,6 +62,8 @@ void swfdec_stream_target_close (SwfdecStreamTarget * target,
SwfdecStream * stream);
void swfdec_stream_target_error (SwfdecStreamTarget * target,
SwfdecStream * stream);
+void swfdec_stream_target_writable (SwfdecStreamTarget * target,
+ SwfdecStream * stream);
G_END_DECLS
diff --git a/swfdec/swfdec_xml_socket.c b/swfdec/swfdec_xml_socket.c
index a55ac96e..f637563b 100644
--- a/swfdec/swfdec_xml_socket.c
+++ b/swfdec/swfdec_xml_socket.c
@@ -40,6 +40,7 @@ swfdec_xml_socket_ensure_closed (SwfdecXmlSocket *xml)
if (xml->socket == NULL)
return;
+ swfdec_buffer_queue_clear (xml->send_queue);
swfdec_stream_set_target (SWFDEC_STREAM (xml->socket), NULL);
g_object_unref (xml->socket);
xml->socket = NULL;
@@ -146,6 +147,30 @@ swfdec_xml_socket_stream_target_close (SwfdecStreamTarget *target,
}
static void
+swfdec_xml_socket_do_write (SwfdecXmlSocket *xml)
+{
+ SwfdecBuffer *buffer;
+ gsize written, length;
+
+ do {
+ buffer = swfdec_buffer_queue_peek_buffer (xml->send_queue);
+ if (buffer == NULL)
+ break;
+ length = buffer->length;
+ written = swfdec_socket_send (xml->socket, buffer);
+ swfdec_buffer_unref (buffer);
+ swfdec_buffer_queue_flush (xml->send_queue, written);
+ } while (written == length);
+}
+
+static void
+swfdec_xml_socket_stream_target_writable (SwfdecStreamTarget *target,
+ SwfdecStream *stream)
+{
+ swfdec_xml_socket_do_write (SWFDEC_XML_SOCKET (target));
+}
+
+static void
swfdec_xml_socket_stream_target_init (SwfdecStreamTargetInterface *iface)
{
iface->get_player = swfdec_xml_socket_stream_target_get_player;
@@ -153,6 +178,7 @@ swfdec_xml_socket_stream_target_init (SwfdecStreamTargetInterface *iface)
iface->parse = swfdec_xml_socket_stream_target_parse;
iface->close = swfdec_xml_socket_stream_target_close;
iface->error = swfdec_xml_socket_stream_target_error;
+ iface->writable = swfdec_xml_socket_stream_target_writable;
}
/*** SWFDEC_XML_SOCKET ***/
@@ -181,6 +207,10 @@ swfdec_xml_socket_dispose (GObject *object)
swfdec_buffer_queue_unref (xml->queue);
xml->queue = NULL;
}
+ if (xml->send_queue) {
+ swfdec_buffer_queue_unref (xml->send_queue);
+ xml->send_queue = NULL;
+ }
G_OBJECT_CLASS (swfdec_xml_socket_parent_class)->dispose (object);
}
@@ -200,6 +230,7 @@ static void
swfdec_xml_socket_init (SwfdecXmlSocket *xml)
{
xml->queue = swfdec_buffer_queue_new ();
+ xml->send_queue = swfdec_buffer_queue_new ();
}
static SwfdecXmlSocket *
@@ -297,10 +328,14 @@ swfdec_xml_socket_send (SwfdecAsContext *cx, SwfdecAsObject *object,
}
len = strlen (send) + 1;
- buf = swfdec_buffer_new (len);
- memcpy (buf->data, send, len);
+ buf = swfdec_buffer_new_for_data (g_memdup (send, len), len);
- swfdec_socket_send (xml->socket, buf);
+ if (swfdec_buffer_queue_get_depth (xml->send_queue) == 0) {
+ swfdec_buffer_queue_push (xml->send_queue, buf);
+ swfdec_xml_socket_do_write (xml);
+ } else {
+ swfdec_buffer_queue_push (xml->send_queue, buf);
+ }
}
SWFDEC_AS_NATIVE (400, 2, swfdec_xml_socket_close)
diff --git a/swfdec/swfdec_xml_socket.h b/swfdec/swfdec_xml_socket.h
index cb1afbce..2c9e5eeb 100644
--- a/swfdec/swfdec_xml_socket.h
+++ b/swfdec/swfdec_xml_socket.h
@@ -45,6 +45,7 @@ struct _SwfdecXmlSocket {
gboolean open; /* the socket has been opened already */
SwfdecBufferQueue * queue; /* everything that belongs to the same string */
SwfdecAsObject * target; /* target object we call out to */
+ SwfdecBufferQueue * send_queue; /* queue of data still to be sent */
};
struct _SwfdecXmlSocketClass {